From 5c6b38aae39e5cc9aac2c922ab991c7ad4b672cf Mon Sep 17 00:00:00 2001 From: Divya Madala Date: Mon, 18 Mar 2024 15:24:26 -0700 Subject: [PATCH] Run benchmark tests agianst a given cluster endpoint Signed-off-by: Divya Madala --- .../benchmark_test/benchmark_args.py | 12 +- .../benchmark_create_cluster.py | 156 ++++++++++++++++ .../benchmark_test/benchmark_test_cluster.py | 167 +++--------------- .../benchmark_test/benchmark_test_runner.py | 9 +- .../benchmark_test_runner_opensearch.py | 26 ++- .../benchmark_test/benchmark_test_suite.py | 14 +- .../test_benchmark_args.py | 4 +- .../test_benchmark_create_cluster.py | 118 +++++++++++++ .../test_benchmark_test_cluster.py | 157 +++++++--------- .../test_benchmark_test_cluster_min.py | 18 +- .../test_benchmark_test_runner_opensearch.py | 59 ++++++- .../test_benchmark_test_suite.py | 7 +- 12 files changed, 460 insertions(+), 287 deletions(-) create mode 100644 src/test_workflow/benchmark_test/benchmark_create_cluster.py create mode 100644 tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_create_cluster.py diff --git a/src/test_workflow/benchmark_test/benchmark_args.py b/src/test_workflow/benchmark_test/benchmark_args.py index ec054b8944..0ceccabf5f 100644 --- a/src/test_workflow/benchmark_test/benchmark_args.py +++ b/src/test_workflow/benchmark_test/benchmark_args.py @@ -18,6 +18,7 @@ # Contains the arguments required to run a perf test. class BenchmarkArgs: bundle_manifest: IO + cluster_endpoint: str distribution_url: str distribution_version: str stack_suffix: str @@ -54,12 +55,14 @@ def __init__(self) -> None: parser = argparse.ArgumentParser(description="Test an OpenSearch Bundle") parser.add_argument("--bundle-manifest", type=argparse.FileType("r"), help="Bundle Manifest file.") parser.add_argument("--distribution-url", dest="distribution_url", help="Link to a downloadable OpenSearch tarball.") + parser.add_argument("--cluster-endpoint", dest="cluster_endpoint", + help="Load balancer url for benchmark testing") parser.add_argument("--distribution-version", dest="distribution_version", help="provide OpenSearch version if using distribution-url param.") parser.add_argument("--suffix", dest="suffix", help="Suffix to be added to stack name for performance test") parser.add_argument("--component", dest="component", default="OpenSearch", help="Component name that needs to be performance tested") - parser.add_argument("--config", type=argparse.FileType("r"), help="Config file.", required=True) + parser.add_argument("--config", type=argparse.FileType("r"), help="Config file.") parser.add_argument("--without-security", dest="insecure", action="store_true", help="Force the security of the cluster to be disabled.", default=False) parser.add_argument("--keep", dest="keep", action="store_true", @@ -121,6 +124,7 @@ def __init__(self) -> None: args = parser.parse_args() self.bundle_manifest = args.bundle_manifest if args.bundle_manifest else None self.distribution_url = args.distribution_url if args.distribution_url else None + self.cluster_endpoint = args.cluster_endpoint if args.cluster_endpoint else None self.distribution_version = args.distribution_version if args.distribution_version else None self.stack_suffix = args.suffix if args.suffix else None self.config = args.config @@ -152,7 +156,7 @@ def __init__(self) -> None: self.telemetry_params = args.telemetry_params if args.telemetry_params else None self.logging_level = args.logging_level - if self.bundle_manifest is None and self.distribution_url is None: - raise Exception('Please provide either --bundle-manifest or --distribution-url to run the performance test.') - elif self.distribution_url and self.distribution_version is None: + if self.bundle_manifest is None and self.distribution_url is None and self.cluster_endpoint is None: + raise Exception('Please provide either --bundle-manifest or --distribution-url or --cluster_endpoint to run the performance test.') + elif (self.distribution_url) and self.distribution_version is None: raise Exception("--distribution-version is required parameter while using --distribution-url param.") diff --git a/src/test_workflow/benchmark_test/benchmark_create_cluster.py b/src/test_workflow/benchmark_test/benchmark_create_cluster.py new file mode 100644 index 0000000000..abe245af6b --- /dev/null +++ b/src/test_workflow/benchmark_test/benchmark_create_cluster.py @@ -0,0 +1,156 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + + +import json +import logging +import os +import subprocess +from contextlib import contextmanager +from typing import Any, Generator, Union + +from manifests.build_manifest import BuildManifest +from manifests.bundle_manifest import BundleManifest +from test_workflow.benchmark_test.benchmark_args import BenchmarkArgs +from test_workflow.benchmark_test.benchmark_test_cluster import BenchmarkTestCluster +from test_workflow.integ_test.utils import get_password + + +class BenchmarkCreateCluster(BenchmarkTestCluster): + manifest: Union[BundleManifest, BuildManifest] + work_dir: str + current_workspace: str + output_file: str + params: str + is_endpoint_public: bool + cluster_endpoint: str + + """ + Represents a performance test cluster. This class deploys the opensearch bundle with CDK. Supports both single + and multi-node clusters + """ + + def __init__( + self, + args: BenchmarkArgs, + bundle_manifest: Union[BundleManifest, BuildManifest], + config: dict, + current_workspace: str + ) -> None: + super().__init__(args) + self.manifest = bundle_manifest + self.current_workspace = current_workspace + self.output_file = "output.json" + role = config["Constants"]["Role"] + params_dict = self.setup_cdk_params(config) + params_list = [] + for key, value in params_dict.items(): + if value: + ''' + TODO: To send json input to typescript code from command line it needs to be enclosed in + single-quotes, this is a temp fix to achieve that since the quoted string passed from command line in + tesh.sh wrapper script gets un-quoted and we need to handle it here. + ''' + if key == 'additionalConfig': + params_list.append(f" -c {key}=\'{value}\'") + else: + params_list.append(f" -c {key}={value}") + role_params = ( + f" --require-approval=never --plugin cdk-assume-role-credential-plugin" + f" -c assume-role-credentials:writeIamRoleName={role} -c assume-role-credentials:readIamRoleName={role} " + ) + self.params = "".join(params_list) + role_params + self.is_endpoint_public = False + self.stack_name = f"opensearch-infra-stack-{self.args.stack_suffix}" + if self.manifest: + self.stack_name += f"-{self.manifest.build.id}-{self.manifest.build.architecture}" + + def start(self) -> None: + command = f"npm install && cdk deploy \"*\" {self.params} --outputs-file {self.output_file}" + + logging.info(f'Executing "{command}" in {os.getcwd()}') + subprocess.check_call(command, cwd=os.getcwd(), shell=True) + with open(self.output_file, "r") as read_file: + load_output = json.load(read_file) + self.create_endpoint(load_output) + self.wait_for_processing() + logging.info("Wait for processing executed") + + def create_endpoint(self, cdk_output: dict) -> None: + load_balancer_url = cdk_output[self.stack_name].get('loadbalancerurl', None) + if load_balancer_url is None: + raise RuntimeError("Unable to fetch the cluster endpoint from cdk output") + self.args.cluster_endpoint = load_balancer_url + self.cluster_endpoint_with_port = "".join([load_balancer_url, ":", str(self.port)]) + + def terminate(self) -> None: + command = f"cdk destroy {self.stack_name} {self.params} --force" + logging.info(f'Executing "{command}" in {os.getcwd()}') + + subprocess.check_call(command, cwd=os.getcwd(), shell=True) + + def setup_cdk_params(self, config: dict) -> dict: + suffix = '' + if self.args.stack_suffix and self.manifest: + suffix = self.args.stack_suffix + '-' + self.manifest.build.id + '-' + self.manifest.build.architecture + elif self.manifest: + suffix = self.manifest.build.id + '-' + self.manifest.build.architecture + elif self.args.stack_suffix: + suffix = self.args.stack_suffix + + if self.manifest: + self.args.distribution_version = self.manifest.build.version + artifact_url = self.manifest.build.location if isinstance(self.manifest, BundleManifest) else \ + f"https://artifacts.opensearch.org/snapshots/core/opensearch/{self.manifest.build.version}/opensearch-min-" \ + f"{self.manifest.build.version}-linux-{self.manifest.build.architecture}-latest.tar.gz" + else: + artifact_url = self.args.distribution_url.strip() + + return { + "distributionUrl": artifact_url, + "vpcId": config["Constants"]["VpcId"], + "account": config["Constants"]["AccountId"], + "region": config["Constants"]["Region"], + "suffix": suffix, + "securityDisabled": str(self.args.insecure).lower(), + "adminPassword": None if self.args.insecure else get_password(self.args.distribution_version), + "cpuArch": self.manifest.build.architecture if self.manifest else 'x64', + "singleNodeCluster": str(self.args.single_node).lower(), + "distVersion": self.args.distribution_version, + "minDistribution": str(self.args.min_distribution).lower(), + "serverAccessType": config["Constants"]["serverAccessType"], + "restrictServerAccessTo": config["Constants"]["restrictServerAccessTo"], + "additionalConfig": self.args.additional_config, + "dataInstanceType": self.args.data_instance_type, + "managerNodeCount": self.args.manager_node_count, + "dataNodeCount": self.args.data_node_count, + "clientNodeCount": self.args.client_node_count, + "ingestNodeCount": self.args.ingest_node_count, + "mlNodeCount": self.args.ml_node_count, + "dataNodeStorage": self.args.data_node_storage, + "mlNodeStorage": self.args.ml_node_storage, + "jvmSysProps": self.args.jvm_sys_props, + "use50PercentHeap": str(self.args.use_50_percent_heap).lower(), + "isInternal": config["Constants"]["isInternal"], + "enableRemoteStore": str(self.args.enable_remote_store).lower(), + "customRoleArn": config["Constants"]["IamRoleArn"] + } + + @classmethod + @contextmanager + def create(cls, *args: Any) -> Generator[Any, None, None]: + """ + Set up the cluster. When this method returns, the cluster must be available to take requests. + Throws ClusterCreationException if the cluster could not start for some reason. If this exception is thrown, the caller does not need to call "destroy". + """ + cluster = cls(*args) + + try: + cluster.start() + yield cluster + finally: + cluster.terminate() diff --git a/src/test_workflow/benchmark_test/benchmark_test_cluster.py b/src/test_workflow/benchmark_test/benchmark_test_cluster.py index 59d318c289..42e29f39c0 100644 --- a/src/test_workflow/benchmark_test/benchmark_test_cluster.py +++ b/src/test_workflow/benchmark_test/benchmark_test_cluster.py @@ -8,94 +8,46 @@ import json import logging -import os import subprocess -from contextlib import contextmanager -from typing import Any, Generator, Union import requests -import semver from requests.auth import HTTPBasicAuth from retry.api import retry_call # type: ignore -from manifests.build_manifest import BuildManifest -from manifests.bundle_manifest import BundleManifest from test_workflow.benchmark_test.benchmark_args import BenchmarkArgs +from test_workflow.integ_test.utils import get_password class BenchmarkTestCluster: - manifest: Union[BundleManifest, BuildManifest] - work_dir: str - current_workspace: str args: BenchmarkArgs - output_file: str - params: str - is_endpoint_public: bool - cluster_endpoint: str cluster_endpoint_with_port: str - - """ - Represents a performance test cluster. This class deploys the opensearch bundle with CDK. Supports both single - and multi-node clusters - """ + password: str def __init__( self, - bundle_manifest: Union[BundleManifest, BuildManifest], - config: dict, - args: BenchmarkArgs, - current_workspace: str + args: BenchmarkArgs + ) -> None: - self.manifest = bundle_manifest - self.current_workspace = current_workspace self.args = args - self.output_file = "output.json" - role = config["Constants"]["Role"] - params_dict = self.setup_cdk_params(config) - params_list = [] - for key, value in params_dict.items(): - if value: - ''' - TODO: To send json input to typescript code from command line it needs to be enclosed in - single-quotes, this is a temp fix to achieve that since the quoted string passed from command line in - tesh.sh wrapper script gets un-quoted and we need to handle it here. - ''' - if key == 'additionalConfig': - params_list.append(f" -c {key}=\'{value}\'") - else: - params_list.append(f" -c {key}={value}") - role_params = ( - f" --require-approval=never --plugin cdk-assume-role-credential-plugin" - f" -c assume-role-credentials:writeIamRoleName={role} -c assume-role-credentials:readIamRoleName={role} " - ) - self.params = "".join(params_list) + role_params - self.is_endpoint_public = False - self.cluster_endpoint = None self.cluster_endpoint_with_port = None - self.stack_name = f"opensearch-infra-stack-{self.args.stack_suffix}" - if self.manifest: - self.stack_name += f"-{self.manifest.build.id}-{self.manifest.build.architecture}" + self.password = None def start(self) -> None: - command = f"npm install && cdk deploy \"*\" {self.params} --outputs-file {self.output_file}" + command = f"curl -X GET http://{self.args.cluster_endpoint}" if self.args.insecure else f"curl -X GET https://{self.args.cluster_endpoint} -u 'admin:{get_password('2.12.0')}' --insecure" + try: + result = subprocess.run(command, shell=True, capture_output=True, timeout=5) + except subprocess.TimeoutExpired: + raise TimeoutError(f"Time out! Couldn't connect to the cluster {self.args.cluster_endpoint}") - logging.info(f'Executing "{command}" in {os.getcwd()}') - subprocess.check_call(command, cwd=os.getcwd(), shell=True) - with open(self.output_file, "r") as read_file: - load_output = json.load(read_file) - self.create_endpoint(load_output) + if result.stdout: + res_dict = json.loads(result.stdout) + self.args.distribution_version = res_dict['version']['number'] self.wait_for_processing() - - def create_endpoint(self, cdk_output: dict) -> None: - loadbalancer_url = cdk_output[self.stack_name].get('loadbalancerurl', None) - if loadbalancer_url is None: - raise RuntimeError("Unable to fetch the cluster endpoint from cdk output") - self.cluster_endpoint = loadbalancer_url - self.cluster_endpoint_with_port = "".join([loadbalancer_url, ":", str(self.port)]) + self.cluster_endpoint_with_port = "".join([self.args.cluster_endpoint, ":", str(self.port)]) @property def endpoint(self) -> str: - return self.cluster_endpoint + return self.args.cluster_endpoint @property def endpoint_with_port(self) -> str: @@ -105,92 +57,17 @@ def endpoint_with_port(self) -> str: def port(self) -> int: return 80 if self.args.insecure else 443 - def terminate(self) -> None: - command = f"cdk destroy {self.stack_name} {self.params} --force" - logging.info(f'Executing "{command}" in {os.getcwd()}') + def get_distribution_version(self) -> str: + return self.args.distribution_version - subprocess.check_call(command, cwd=os.getcwd(), shell=True) + def fetch_password(self) -> str: + return self.password def wait_for_processing(self, tries: int = 3, delay: int = 15, backoff: int = 2) -> None: - # To-do: Make this better - password = 'admin' - if self.manifest: - if semver.compare(self.manifest.build.version, '2.12.0') != -1: - password = 'myStrongPassword123!' - else: - if semver.compare(self.args.distribution_version, '2.12.0') != -1: - password = 'myStrongPassword123!' - logging.info(f"Waiting for domain at {self.endpoint} to be up") protocol = "http://" if self.args.insecure else "https://" url = "".join([protocol, self.endpoint, "/_cluster/health"]) - request_args = {"url": url} if self.args.insecure else {"url": url, "auth": HTTPBasicAuth("admin", password), # type: ignore + self.password = None if self.args.insecure else get_password(self.args.distribution_version) + request_args = {"url": url} if self.args.insecure else {"url": url, "auth": HTTPBasicAuth("admin", self.password), # type: ignore "verify": False} # type: ignore - retry_call(requests.get, fkwargs=request_args, - tries=tries, delay=delay, backoff=backoff) - - def setup_cdk_params(self, config: dict) -> dict: - suffix = '' - need_strong_password = False - if self.args.stack_suffix and self.manifest: - suffix = self.args.stack_suffix + '-' + self.manifest.build.id + '-' + self.manifest.build.architecture - elif self.manifest: - suffix = self.manifest.build.id + '-' + self.manifest.build.architecture - elif self.args.stack_suffix: - suffix = self.args.stack_suffix - - if self.manifest: - artifact_url = self.manifest.build.location if isinstance(self.manifest, BundleManifest) else \ - f"https://artifacts.opensearch.org/snapshots/core/opensearch/{self.manifest.build.version}/opensearch-min-" \ - f"{self.manifest.build.version}-linux-{self.manifest.build.architecture}-latest.tar.gz" - if not self.args.insecure and semver.compare(self.manifest.build.version, '2.12.0') != -1: - need_strong_password = True - else: - artifact_url = self.args.distribution_url.strip() - if not self.args.insecure and semver.compare(self.args.distribution_version, '2.12.0') != -1: - need_strong_password = True - - return { - "distributionUrl": artifact_url, - "vpcId": config["Constants"]["VpcId"], - "account": config["Constants"]["AccountId"], - "region": config["Constants"]["Region"], - "suffix": suffix, - "securityDisabled": str(self.args.insecure).lower(), - "adminPassword": 'myStrongPassword123!' if need_strong_password else None, - "cpuArch": self.manifest.build.architecture if self.manifest else 'x64', - "singleNodeCluster": str(self.args.single_node).lower(), - "distVersion": self.manifest.build.version if self.manifest else self.args.distribution_version, - "minDistribution": str(self.args.min_distribution).lower(), - "serverAccessType": config["Constants"]["serverAccessType"], - "restrictServerAccessTo": config["Constants"]["restrictServerAccessTo"], - "additionalConfig": self.args.additional_config, - "dataInstanceType": self.args.data_instance_type, - "managerNodeCount": self.args.manager_node_count, - "dataNodeCount": self.args.data_node_count, - "clientNodeCount": self.args.client_node_count, - "ingestNodeCount": self.args.ingest_node_count, - "mlNodeCount": self.args.ml_node_count, - "dataNodeStorage": self.args.data_node_storage, - "mlNodeStorage": self.args.ml_node_storage, - "jvmSysProps": self.args.jvm_sys_props, - "use50PercentHeap": str(self.args.use_50_percent_heap).lower(), - "isInternal": config["Constants"]["isInternal"], - "enableRemoteStore": str(self.args.enable_remote_store).lower(), - "customRoleArn": config["Constants"]["IamRoleArn"] - } - - @classmethod - @contextmanager - def create(cls, *args: Any) -> Generator[Any, None, None]: - """ - Set up the cluster. When this method returns, the cluster must be available to take requests. - Throws ClusterCreationException if the cluster could not start for some reason. If this exception is thrown, the caller does not need to call "destroy". - """ - cluster = cls(*args) - - try: - cluster.start() - yield cluster - finally: - cluster.terminate() + retry_call(requests.get, fkwargs=request_args, tries=tries, delay=delay, backoff=backoff) diff --git a/src/test_workflow/benchmark_test/benchmark_test_runner.py b/src/test_workflow/benchmark_test/benchmark_test_runner.py index 7669c2d285..33063ab2a5 100644 --- a/src/test_workflow/benchmark_test/benchmark_test_runner.py +++ b/src/test_workflow/benchmark_test/benchmark_test_runner.py @@ -27,7 +27,7 @@ def __init__(self, args: BenchmarkArgs, test_manifest: Union[BundleManifest, Bui if self.test_manifest: self.security = "security" in self.test_manifest.components and not self.args.insecure else: - self.security = False + self.security = not self.args.insecure self.tests_dir = os.path.join(os.getcwd(), "test-results", "benchmark-test", f"{'with' if self.security else 'without'}-security") @@ -53,10 +53,3 @@ def get_git_ref(self) -> str: return 'main' else: return '1.x' - - def get_distribution_version(self) -> str: - os_version_float: float - if self.test_manifest: - return self.test_manifest.build.version - else: - return self.args.distribution_version diff --git a/src/test_workflow/benchmark_test/benchmark_test_runner_opensearch.py b/src/test_workflow/benchmark_test/benchmark_test_runner_opensearch.py index 3f4fa92826..56d0b5425c 100644 --- a/src/test_workflow/benchmark_test/benchmark_test_runner_opensearch.py +++ b/src/test_workflow/benchmark_test/benchmark_test_runner_opensearch.py @@ -18,6 +18,7 @@ from system.temporary_directory import TemporaryDirectory from system.working_directory import WorkingDirectory from test_workflow.benchmark_test.benchmark_args import BenchmarkArgs +from test_workflow.benchmark_test.benchmark_create_cluster import BenchmarkCreateCluster from test_workflow.benchmark_test.benchmark_test_cluster import BenchmarkTestCluster from test_workflow.benchmark_test.benchmark_test_runner import BenchmarkTestRunner from test_workflow.benchmark_test.benchmark_test_suite import BenchmarkTestSuite @@ -37,12 +38,19 @@ def get_cluster_repo_url(self) -> str: return "https://github.com/opensearch-project/opensearch-cluster-cdk.git" def run_tests(self) -> None: - config = yaml.safe_load(self.args.config) - - with TemporaryDirectory(keep=self.args.keep, chdir=True) as work_dir: - current_workspace = os.path.join(work_dir.name, "opensearch-cluster-cdk") - with GitRepository(self.get_cluster_repo_url(), self.get_git_ref(), current_workspace): - with WorkingDirectory(current_workspace): - with BenchmarkTestCluster.create(self.test_manifest, config, self.args, current_workspace) as test_cluster: - benchmark_test_suite = BenchmarkTestSuite(test_cluster.endpoint_with_port, self.security, self.get_distribution_version(), self.args) - retry_call(benchmark_test_suite.execute, tries=3, delay=60, backoff=2) + if self.args.cluster_endpoint: + cluster = BenchmarkTestCluster(self.args) + cluster.start() + benchmark_test_suite = BenchmarkTestSuite(cluster.endpoint_with_port, self.security, self.args, cluster.get_distribution_version(), cluster.fetch_password()) + retry_call(benchmark_test_suite.execute, tries=3, delay=60, backoff=2) + + else: + config = yaml.safe_load(self.args.config) + + with TemporaryDirectory(keep=self.args.keep, chdir=True) as work_dir: + current_workspace = os.path.join(work_dir.name, "opensearch-cluster-cdk") + with GitRepository(self.get_cluster_repo_url(), self.get_git_ref(), current_workspace): + with WorkingDirectory(current_workspace): + with BenchmarkCreateCluster.create(self.args, self.test_manifest, config, current_workspace) as test_cluster: + benchmark_test_suite = BenchmarkTestSuite(test_cluster.endpoint_with_port, self.security, self.args, test_cluster.get_distribution_version(), test_cluster.fetch_password()) + retry_call(benchmark_test_suite.execute, tries=3, delay=60, backoff=2) diff --git a/src/test_workflow/benchmark_test/benchmark_test_suite.py b/src/test_workflow/benchmark_test/benchmark_test_suite.py index 304439a414..c118ab5cb8 100644 --- a/src/test_workflow/benchmark_test/benchmark_test_suite.py +++ b/src/test_workflow/benchmark_test/benchmark_test_suite.py @@ -10,18 +10,16 @@ import subprocess from typing import Any -import semver - from test_workflow.benchmark_test.benchmark_args import BenchmarkArgs class BenchmarkTestSuite: endpoint: str security: bool - current_workspace: str args: BenchmarkArgs command: str distribution_version: str + password: str """ Represents a performance test suite. This class runs rally test on the deployed cluster with the provided IP. @@ -31,13 +29,16 @@ def __init__( self, endpoint: Any, security: bool, - distribution_version: str, args: BenchmarkArgs, + distribution_version: str, + password: str ) -> None: self.endpoint = endpoint self.security = security self.args = args self.distribution_version = distribution_version + self.password = password + # Pass the cluster endpoints with -t for multi-cluster use cases(e.g. cross-cluster-replication) self.command = 'docker run --rm' if self.args.benchmark_config: @@ -70,11 +71,8 @@ def __init__( self.command += f" --telemetry-params '{self.args.telemetry_params}'" def execute(self) -> None: - password: str = 'admin' if self.security: - if semver.compare(self.distribution_version, '2.12.0') != -1: - password = 'myStrongPassword123!' - self.command += f' --client-options="timeout:300,use_ssl:true,verify_certs:false,basic_auth_user:\'admin\',basic_auth_password:\'{password}\'"' + self.command += f' --client-options="timeout:300,use_ssl:true,verify_certs:false,basic_auth_user:\'admin\',basic_auth_password:\'{self.password}\'"' else: self.command += ' --client-options="timeout:300"' logging.info(f"Executing {self.command}") diff --git a/tests/tests_test_workflow/test_benchmark_args.py b/tests/tests_test_workflow/test_benchmark_args.py index 6a39ddf8df..9d607fa6bd 100644 --- a/tests/tests_test_workflow/test_benchmark_args.py +++ b/tests/tests_test_workflow/test_benchmark_args.py @@ -81,10 +81,10 @@ def test_benchmark_with_distribution_url_and_without_version(self) -> None: self.assertEqual(str(context.exception), "--distribution-version is required parameter while using --distribution-url param.") @patch("argparse._sys.argv", [ARGS_PY, "--config", TEST_CONFIG_PATH, "--workload", "test"]) - def test_benchmark_without_distribution_url_and_without_manifest(self) -> None: + def test_benchmark_without_distribution_url_and_without_manifest_and_cluster_endpoint(self) -> None: with self.assertRaises(Exception) as context: BenchmarkArgs() - self.assertEqual(str(context.exception), "Please provide either --bundle-manifest or --distribution-url to run the performance test.") + self.assertEqual(str(context.exception), "Please provide either --bundle-manifest or --distribution-url or --cluster_endpoint to run the performance test.") @patch("argparse._sys.argv", [ARGS_PY, "--bundle-manifest", TEST_DIST_MANIFEST_PATH, "--config", TEST_CONFIG_PATH, "--workload", "test", "--test-procedure", 'test-procedure,another-test-procedure', "--exclude-tasks", "index,type:search,tag:setup", diff --git a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_create_cluster.py b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_create_cluster.py new file mode 100644 index 0000000000..27b6f6a246 --- /dev/null +++ b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_create_cluster.py @@ -0,0 +1,118 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import os +import unittest +from typing import Optional +from unittest.mock import MagicMock, Mock, patch + +from manifests.bundle_manifest import BundleManifest +from test_workflow.benchmark_test.benchmark_create_cluster import BenchmarkCreateCluster + + +class TestBenchmarkCreateCluster(unittest.TestCase): + DATA = os.path.join(os.path.dirname(__file__), "data") + BUNDLE_MANIFEST = os.path.join(DATA, "bundle_manifest.yml") + + def setUp(self, args: Optional[Mock] = None, use_manifest: bool = True) -> None: + self.args = Mock() + if args: + self.args = args + else: + self.args.workload = "nyc_taxis" + self.args.stack_suffix = "test-suffix" + self.args.insecure = False + self.args.single_node = True + self.args.min_distribution = False + self.manifest = BundleManifest.from_path(self.BUNDLE_MANIFEST) if use_manifest else None + self.stack_name = "stack" + self.security = True + self.config = {"Constants": {"SecurityGroupId": "sg-00000000", "VpcId": "vpc-12345", "AccountId": "12345678", + "Region": "us-west-2", "Role": "role-arn", "serverAccessType": "prefixList", "restrictServerAccessTo": "pl-1234", + "isInternal": "true", "IamRoleArn": "arn:aws:iam::12344567890:role/customRole"}} + self.benchmark_create_cluster = BenchmarkCreateCluster(bundle_manifest=self.manifest, config=self.config, args=self.args, current_workspace="current_workspace") + + @patch("test_workflow.benchmark_test.benchmark_create_cluster.BenchmarkCreateCluster.wait_for_processing") + def test_create_single_node_secure(self, mock_wait_for_processing: Optional[Mock]) -> None: + mock_file = MagicMock(side_effect=[{"opensearch-infra-stack-test-suffix-007-x64": {"loadbalancerurl": "www.example.com"}}]) + with patch("subprocess.check_call") as mock_check_call: + with patch("builtins.open", MagicMock()): + with patch("json.load", mock_file): + self.benchmark_create_cluster.start() + self.assertEqual(mock_check_call.call_count, 1) + self.assertEqual(self.benchmark_create_cluster.endpoint_with_port, 'www.example.com:443') + self.assertEqual(self.benchmark_create_cluster.port, 443) + self.assertTrue("opensearch-infra-stack-test-suffix-007-x64" in self.benchmark_create_cluster.stack_name) + self.assertTrue("securityDisabled=false" in self.benchmark_create_cluster.params) + self.assertTrue("adminPassword=admin" in self.benchmark_create_cluster.params) + self.assertTrue("singleNodeCluster=true" in self.benchmark_create_cluster.params) + self.assertTrue("isInternal=true" in self.benchmark_create_cluster.params) + self.assertTrue("distributionUrl=https://artifacts.opensearch.org/bundles/1.0.0/41d5ae25183d4e699e92debfbe3f83bd/opensearch-1.0.0-linux-x64.tar.gz" in self.benchmark_create_cluster.params) + self.assertTrue(isinstance(self.manifest, BundleManifest)) + with patch("subprocess.check_call") as mock_check_call: + self.benchmark_create_cluster.terminate() + self.assertEqual(mock_check_call.call_count, 1) + + def test_endpoint(self) -> None: + self.assertEqual(self.benchmark_create_cluster.endpoint_with_port, None) + + def test_port(self) -> None: + self.assertEqual(self.benchmark_create_cluster.port, 443) + + @patch("test_workflow.benchmark_test.benchmark_create_cluster.BenchmarkCreateCluster.wait_for_processing") + def test_create_single_node_insecure(self, mock_wait_for_processing: Optional[Mock]) -> None: + self.args.insecure = True + self.args.data_instance_type = 'r5.4xlarge' + TestBenchmarkCreateCluster.setUp(self, self.args) + mock_file = MagicMock(side_effect=[{"opensearch-infra-stack-test-suffix-007-x64": {"loadbalancerurl": "www.example.com"}}]) + with patch("subprocess.check_call") as mock_check_call: + with patch("builtins.open", MagicMock()): + with patch("json.load", mock_file): + self.benchmark_create_cluster.start() + self.assertEqual(mock_check_call.call_count, 1) + + self.assertEqual(self.benchmark_create_cluster.endpoint_with_port, 'www.example.com:80') + self.assertEqual(self.benchmark_create_cluster.port, 80) + self.assertTrue("securityDisabled=true" in self.benchmark_create_cluster.params) + self.assertTrue("dataInstanceType=r5.4xlarge" in self.benchmark_create_cluster.params) + self.assertTrue("customRoleArn=arn:aws:iam::12344567890:role/customRole" in self.benchmark_create_cluster.params) + + @patch("test_workflow.benchmark_test.benchmark_create_cluster.BenchmarkCreateCluster.wait_for_processing") + def test_create_multi_node(self, mock_wait_for_processing: Optional[Mock]) -> None: + self.args.single_node = False + self.args.use_50_percent_heap = True + self.args.enable_remote_store = True + TestBenchmarkCreateCluster.setUp(self, self.args) + mock_file = MagicMock(side_effect=[{"opensearch-infra-stack-test-suffix-007-x64": {"loadbalancerurl": "www.example.com"}}]) + with patch("subprocess.check_call") as mock_check_call: + with patch("builtins.open", MagicMock()): + with patch("json.load", mock_file): + self.benchmark_create_cluster.start() + self.assertEqual(mock_check_call.call_count, 1) + + self.assertTrue("singleNodeCluster=false" in self.benchmark_create_cluster.params) + self.assertTrue("use50PercentHeap=true" in self.benchmark_create_cluster.params) + self.assertTrue("enableRemoteStore=true" in self.benchmark_create_cluster.params) + + @patch("test_workflow.benchmark_test.benchmark_create_cluster.BenchmarkCreateCluster.wait_for_processing") + def test_create_multi_node_without_manifest(self, mock_wait_for_processing: Optional[Mock]) -> None: + self.args.distribution_url = "https://artifacts.opensearch.org/2.10.0/opensearch.tar.gz" + self.args.distribution_version = '2.12.0' + self.args.insecure = False + TestBenchmarkCreateCluster.setUp(self, self.args, False) + mock_file = MagicMock(side_effect=[{"opensearch-infra-stack-test-suffix": {"loadbalancerurl": "www.example.com"}}]) + with patch("subprocess.check_call") as mock_check_call: + with patch("builtins.open", MagicMock()): + with patch("json.load", mock_file): + self.benchmark_create_cluster.start() + self.assertEqual(mock_check_call.call_count, 1) + self.assertTrue("opensearch-infra-stack-test-suffix" in self.benchmark_create_cluster.stack_name) + self.assertTrue("cpuArch=x64" in self.benchmark_create_cluster.params) + self.assertTrue("distVersion=2.12.0" in self.benchmark_create_cluster.params) + self.assertTrue("securityDisabled=false" in self.benchmark_create_cluster.params) + self.assertTrue("adminPassword=myStrongPassword123!" in self.benchmark_create_cluster.params) + self.assertTrue("distributionUrl=https://artifacts.opensearch.org/2.10.0/opensearch.tar.gz" in self.benchmark_create_cluster.params) diff --git a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster.py b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster.py index e058c6df36..ae5163e5ff 100644 --- a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster.py +++ b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster.py @@ -5,114 +5,81 @@ # this file be licensed under the Apache-2.0 license or a # compatible open source license. -import os +import subprocess import unittest -from typing import Optional from unittest.mock import MagicMock, Mock, patch -from manifests.bundle_manifest import BundleManifest from test_workflow.benchmark_test.benchmark_test_cluster import BenchmarkTestCluster class TestBenchmarkTestCluster(unittest.TestCase): - DATA = os.path.join(os.path.dirname(__file__), "data") - BUNDLE_MANIFEST = os.path.join(DATA, "bundle_manifest.yml") - - def setUp(self, args: Optional[Mock] = None, use_manifest: bool = True) -> None: + def setUp(self) -> None: self.args = Mock() - if args: - self.args = args - else: - self.args.workload = "nyc_taxis" - self.args.stack_suffix = "test-suffix" - self.args.insecure = False - self.args.single_node = True - self.args.min_distribution = False - self.manifest = BundleManifest.from_path(self.BUNDLE_MANIFEST) if use_manifest else None - self.stack_name = "stack" - self.security = True - self.config = {"Constants": {"SecurityGroupId": "sg-00000000", "VpcId": "vpc-12345", "AccountId": "12345678", - "Region": "us-west-2", "Role": "role-arn", "serverAccessType": "prefixList", "restrictServerAccessTo": "pl-1234", - "isInternal": "true", "IamRoleArn": "arn:aws:iam::12344567890:role/customRole"}} - self.benchmark_test_cluster = BenchmarkTestCluster(bundle_manifest=self.manifest, config=self.config, args=self.args, current_workspace="current_workspace") + self.password = None + self.cluster_endpoint_with_port = None - @patch("test_workflow.benchmark_test.benchmark_test_cluster.BenchmarkTestCluster.wait_for_processing") - def test_create_single_node_secure(self, mock_wait_for_processing: Optional[Mock]) -> None: - mock_file = MagicMock(side_effect=[{"opensearch-infra-stack-test-suffix-007-x64": {"loadbalancerurl": "www.example.com"}}]) - with patch("subprocess.check_call") as mock_check_call: - with patch("builtins.open", MagicMock()): - with patch("json.load", mock_file): - self.benchmark_test_cluster.start() - self.assertEqual(mock_check_call.call_count, 1) - self.assertEqual(self.benchmark_test_cluster.endpoint_with_port, 'www.example.com:443') - self.assertEqual(self.benchmark_test_cluster.port, 443) - self.assertTrue("opensearch-infra-stack-test-suffix-007-x64" in self.benchmark_test_cluster.stack_name) - self.assertTrue("securityDisabled=false" in self.benchmark_test_cluster.params) - self.assertTrue("adminPassword" not in self.benchmark_test_cluster.params) - self.assertTrue("singleNodeCluster=true" in self.benchmark_test_cluster.params) - self.assertTrue("isInternal=true" in self.benchmark_test_cluster.params) - self.assertTrue("distributionUrl=https://artifacts.opensearch.org/bundles/1.0.0/41d5ae25183d4e699e92debfbe3f83bd/opensearch-1.0.0-linux-x64.tar.gz" in self.benchmark_test_cluster.params) - self.assertTrue(isinstance(self.manifest, BundleManifest)) - with patch("subprocess.check_call") as mock_check_call: - self.benchmark_test_cluster.terminate() - self.assertEqual(mock_check_call.call_count, 1) + self.benchmark_test_cluster = BenchmarkTestCluster(self.args) - def test_endpoint(self) -> None: - self.assertEqual(self.benchmark_test_cluster.endpoint_with_port, None) + @patch("subprocess.run") + @patch("requests.get") + def test_endpoint_without_security(self, mock_requests_get: Mock, mock_subprocess_run: Mock) -> None: + self.args.insecure = True + self.args.cluster_endpoint = "opensearch-cluster.amazon.com" + self.cluster_endpoint_with_port = None + mock_result = MagicMock() + mock_result.stdout = ''' + { + "cluster_name" : "opensearch-cluster.amazon.com”, + "version": { + "distribution": "opensearch", + "number": “2.9.0”, + "build_type": "tar", + "minimum_index_compatibility_version": "2.0.0" + } + } + ''' + mock_subprocess_run.return_value = mock_result + with patch("json.loads", ): + self.benchmark_test_cluster.start() + mock_requests_get.assert_called_with(url="http://opensearch-cluster.amazon.com/_cluster/health") + self.assertEqual(self.benchmark_test_cluster.endpoint_with_port, 'opensearch-cluster.amazon.com:80') + self.assertEqual(self.benchmark_test_cluster.port, 80) - def test_port(self) -> None: + @patch('test_workflow.benchmark_test.benchmark_test_cluster.get_password') + @patch("subprocess.run") + @patch("requests.get") + @patch('test_workflow.benchmark_test.benchmark_test_cluster.HTTPBasicAuth') + def test_endpoint_with_security(self, mock_http_auth: Mock, mock_requests_get: Mock, mock_subprocess_run: Mock, + mock_password: Mock) -> None: + self.args.insecure = False + self.args.cluster_endpoint = "opensearch-cluster.amazon.com" + mock_password.return_value = "admin" + mock_result = MagicMock() + mock_result.stdout = ''' + { + "cluster_name" : "opensearch-cluster.amazon.com", + "version": { + "distribution": "opensearch", + "number": "2.9.0", + "build_type": "tar", + "minimum_index_compatibility_version": "2.0.0" + } + } + ''' + mock_subprocess_run.return_value = mock_result + with patch("json.loads"): + self.benchmark_test_cluster.start() + mock_requests_get.assert_called_with(url=f"https://{self.args.cluster_endpoint}/_cluster/health", auth=mock_http_auth.return_value, verify=False) + self.assertEqual(self.benchmark_test_cluster.endpoint_with_port, 'opensearch-cluster.amazon.com:443') self.assertEqual(self.benchmark_test_cluster.port, 443) - @patch("test_workflow.benchmark_test.benchmark_test_cluster.BenchmarkTestCluster.wait_for_processing") - def test_create_single_node_insecure(self, mock_wait_for_processing: Optional[Mock]) -> None: + def test_endpoint_with_timeout_error(self) -> None: self.args.insecure = True - self.args.data_instance_type = 'r5.4xlarge' - TestBenchmarkTestCluster.setUp(self, self.args) - mock_file = MagicMock(side_effect=[{"opensearch-infra-stack-test-suffix-007-x64": {"loadbalancerurl": "www.example.com"}}]) - with patch("subprocess.check_call") as mock_check_call: - with patch("builtins.open", MagicMock()): - with patch("json.load", mock_file): - self.benchmark_test_cluster.start() - self.assertEqual(mock_check_call.call_count, 1) + self.args.cluster_endpoint = "opensearch-cluster.amazon.com" + with patch('subprocess.run') as mock_run: + mock_run.side_effect = subprocess.TimeoutExpired("Command", 5) - self.assertEqual(self.benchmark_test_cluster.endpoint_with_port, 'www.example.com:80') - self.assertEqual(self.benchmark_test_cluster.port, 80) - self.assertTrue("securityDisabled=true" in self.benchmark_test_cluster.params) - self.assertTrue("dataInstanceType=r5.4xlarge" in self.benchmark_test_cluster.params) - self.assertTrue("customRoleArn=arn:aws:iam::12344567890:role/customRole" in self.benchmark_test_cluster.params) - - @patch("test_workflow.benchmark_test.benchmark_test_cluster.BenchmarkTestCluster.wait_for_processing") - def test_create_multi_node(self, mock_wait_for_processing: Optional[Mock]) -> None: - self.args.single_node = False - self.args.use_50_percent_heap = True - self.args.enable_remote_store = True - TestBenchmarkTestCluster.setUp(self, self.args) - mock_file = MagicMock(side_effect=[{"opensearch-infra-stack-test-suffix-007-x64": {"loadbalancerurl": "www.example.com"}}]) - with patch("subprocess.check_call") as mock_check_call: - with patch("builtins.open", MagicMock()): - with patch("json.load", mock_file): - self.benchmark_test_cluster.start() - self.assertEqual(mock_check_call.call_count, 1) + with self.assertRaises(TimeoutError) as context: + self.benchmark_test_cluster.start() - self.assertTrue("singleNodeCluster=false" in self.benchmark_test_cluster.params) - self.assertTrue("use50PercentHeap=true" in self.benchmark_test_cluster.params) - self.assertTrue("enableRemoteStore=true" in self.benchmark_test_cluster.params) - - @patch("test_workflow.benchmark_test.benchmark_test_cluster.BenchmarkTestCluster.wait_for_processing") - def test_create_multi_node_without_manifest(self, mock_wait_for_processing: Optional[Mock]) -> None: - self.args.distribution_url = "https://artifacts.opensearch.org/2.10.0/opensearch.tar.gz" - self.args.distribution_version = '2.12.0' - self.args.insecure = False - TestBenchmarkTestCluster.setUp(self, self.args, False) - mock_file = MagicMock(side_effect=[{"opensearch-infra-stack-test-suffix": {"loadbalancerurl": "www.example.com"}}]) - with patch("subprocess.check_call") as mock_check_call: - with patch("builtins.open", MagicMock()): - with patch("json.load", mock_file): - self.benchmark_test_cluster.start() - self.assertEqual(mock_check_call.call_count, 1) - self.assertTrue("opensearch-infra-stack-test-suffix" in self.benchmark_test_cluster.stack_name) - self.assertTrue("cpuArch=x64" in self.benchmark_test_cluster.params) - self.assertTrue("distVersion=2.12.0" in self.benchmark_test_cluster.params) - self.assertTrue("securityDisabled=false" in self.benchmark_test_cluster.params) - self.assertTrue("adminPassword=myStrongPassword123!" in self.benchmark_test_cluster.params) - self.assertTrue("distributionUrl=https://artifacts.opensearch.org/2.10.0/opensearch.tar.gz" in self.benchmark_test_cluster.params) + self.assertIn("Time out! Couldn't connect to the cluster", str(context.exception)) diff --git a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster_min.py b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster_min.py index 03b7ca660b..068b78fa34 100644 --- a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster_min.py +++ b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster_min.py @@ -11,10 +11,10 @@ from unittest.mock import MagicMock, Mock, patch from manifests.build_manifest import BuildManifest -from test_workflow.benchmark_test.benchmark_test_cluster import BenchmarkTestCluster +from test_workflow.benchmark_test.benchmark_create_cluster import BenchmarkCreateCluster -class TestBenchmarkTestClusterMin(unittest.TestCase): +class TestBenchmarkCreateClusterMin(unittest.TestCase): DATA = os.path.join(os.path.dirname(__file__), "data") MIN_MANIFEST = os.path.join(DATA, "min_distribution_manifest.yml") @@ -32,19 +32,19 @@ def setUp(self, args: Optional[Mock] = None) -> None: self.config = {"Constants": {"SecurityGroupId": "sg-00000000", "VpcId": "vpc-12345", "AccountId": "12345678", "Region": "us-west-2", "Role": "role-arn", "serverAccessType": "prefixList", "restrictServerAccessTo": "pl-1234", "isInternal": "true", "IamRoleArn": ""}} - self.benchmark_test_cluster = BenchmarkTestCluster(bundle_manifest=self.manifest, config=self.config, args=self.args, current_workspace="current_workspace") + self.benchmark_create_cluster = BenchmarkCreateCluster(bundle_manifest=self.manifest, config=self.config, args=self.args, current_workspace="current_workspace") - @patch("test_workflow.benchmark_test.benchmark_test_cluster.BenchmarkTestCluster.wait_for_processing") + @patch("test_workflow.benchmark_test.benchmark_create_cluster.BenchmarkCreateCluster.wait_for_processing") def test_create_min_cluster(self, mock_wait_for_processing: Optional[Mock]) -> None: mock_file = MagicMock(side_effect=[{"opensearch-infra-stack-test-suffix-8042-arm64": {"loadbalancerurl": "www.example.com"}}]) with patch("subprocess.check_call") as mock_check_call: with patch("builtins.open", MagicMock()): with patch("json.load", mock_file): - self.benchmark_test_cluster.start() + self.benchmark_create_cluster.start() self.assertEqual(mock_check_call.call_count, 1) self.assertTrue(isinstance(self.manifest, BuildManifest)) - self.assertTrue("securityDisabled=true" in self.benchmark_test_cluster.params) - self.assertTrue("minDistribution=true" in self.benchmark_test_cluster.params) + self.assertTrue("securityDisabled=true" in self.benchmark_create_cluster.params) + self.assertTrue("minDistribution=true" in self.benchmark_create_cluster.params) self.assertTrue("distributionUrl=https://artifacts.opensearch.org/snapshots/core/opensearch/2.9.0-SNAPSHOT/" - "opensearch-min-2.9.0-SNAPSHOT-linux-arm64-latest.tar.gz" in self.benchmark_test_cluster.params) - self.assertTrue("customRoleArn" not in self.benchmark_test_cluster.params) + "opensearch-min-2.9.0-SNAPSHOT-linux-arm64-latest.tar.gz" in self.benchmark_create_cluster.params) + self.assertTrue("customRoleArn" not in self.benchmark_create_cluster.params) diff --git a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_runner_opensearch.py b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_runner_opensearch.py index b41e308d5a..62736438db 100644 --- a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_runner_opensearch.py +++ b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_runner_opensearch.py @@ -7,11 +7,12 @@ import os import tempfile import unittest -from typing import Any -from unittest.mock import Mock, patch +from typing import Any, Optional +from unittest.mock import MagicMock, Mock, patch from manifests.bundle_manifest import BundleManifest from test_workflow.benchmark_test.benchmark_args import BenchmarkArgs +from test_workflow.benchmark_test.benchmark_test_runner_opensearch import BenchmarkTestRunnerOpenSearch from test_workflow.benchmark_test.benchmark_test_runners import BenchmarkTestRunners @@ -26,7 +27,7 @@ class TestBenchmarkTestRunnerOpenSearch(unittest.TestCase): @patch("os.chdir") @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.TemporaryDirectory") @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.GitRepository") - @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkTestCluster.create") + @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkCreateCluster.create") @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkTestSuite") def test_run(self, mock_suite: Mock, mock_cluster: Mock, mock_git: Mock, mock_temp_directory: Mock, *mocks: Any) -> None: @@ -56,7 +57,7 @@ def test_run(self, mock_suite: Mock, mock_cluster: Mock, mock_git: Mock, mock_te @patch("os.chdir") @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.TemporaryDirectory") @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.GitRepository") - @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkTestCluster.create") + @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkCreateCluster.create") @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkTestSuite") def test_run_with_dist_url_and_version(self, mock_suite: Mock, mock_cluster: Mock, mock_git: Mock, mock_temp_directory: Mock, @@ -73,3 +74,53 @@ def test_run_with_dist_url_and_version(self, mock_suite: Mock, mock_cluster: Moc self.assertEqual(mock_cluster.call_count, 1) self.assertEqual(mock_git.call_count, 1) self.assertEqual(mock_temp_directory.call_count, 1) + + @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkTestCluster.start") + @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkTestSuite") + @patch('test_workflow.benchmark_test.benchmark_test_runner_opensearch.retry_call') + def test_run_with_cluster_endpoint(self, mock_retry_call: Mock, mock_suite: Mock, mock_benchmark_test_cluster: Mock) -> None: + args = MagicMock(cluster_endpoint=True) + mock_cluster = MagicMock() + + mock_benchmark_test_cluster.return_value = mock_cluster + instance = BenchmarkTestRunnerOpenSearch(args, None) + instance.run_tests() + self.assertEqual(mock_suite.call_count, 1) + self.assertEqual(mock_benchmark_test_cluster.call_count, 1) + mock_retry_call.assert_called_once_with(mock_suite.return_value.execute, tries=3, delay=60, backoff=2) + + @patch('test_workflow.benchmark_test.benchmark_test_cluster.BenchmarkTestCluster.wait_for_processing') + @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkTestSuite") + @patch('test_workflow.benchmark_test.benchmark_test_runner_opensearch.retry_call') + @patch("subprocess.run") + @patch("requests.get") + def test_run_with_cluster_endpoint_with_arguments(self, mock_requests_get: Mock, mock_subprocess_run: Mock, + mock_retry_call: Mock, mock_suite: Mock, mock_wait_for_processing: Optional[Mock]) -> None: + args = MagicMock(cluster_endpoint=True) + mock_wait_for_processing.return_value = None + mock_result = MagicMock() + mock_result.stdout = ''' + { + "cluster_name" : "opensearch-cluster.amazon.com", + "version": { + "distribution": "opensearch", + "number": "2.9.0", + "build_type": "tar", + "minimum_index_compatibility_version": "2.0.0" + } + } + ''' + mock_subprocess_run.return_value = mock_result + + instance = BenchmarkTestRunnerOpenSearch(args, None) + with patch('test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkTestCluster') as MockBenchmarkTestCluster: + mock_cluster_instance = MockBenchmarkTestCluster.return_value + mock_cluster_instance.endpoint_with_port = "opensearch-cluster.amazon.com" + mock_cluster_instance.get_distribution_version.return_value = "2.9.0" + mock_cluster_instance.fetch_password.return_value = "admin" + + with patch("json.loads"): + instance.run_tests() + self.assertEqual(mock_suite.call_count, 1) + self.assertEqual(MockBenchmarkTestCluster.call_count, 1) + mock_retry_call.assert_called_once_with(mock_suite.return_value.execute, tries=3, delay=60, backoff=2) diff --git a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_suite.py b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_suite.py index 2f9e2151d3..525787e627 100644 --- a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_suite.py +++ b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_suite.py @@ -10,6 +10,7 @@ from unittest.mock import Mock, patch from test_workflow.benchmark_test.benchmark_test_suite import BenchmarkTestSuite +from test_workflow.integ_test.utils import get_password class TestBenchmarkTestSuite(unittest.TestCase): @@ -25,7 +26,7 @@ def setUp(self, **kwargs: Any) -> None: self.args.exclude_tasks = kwargs['exclude_tasks'] if 'exclude_tasks' in kwargs else None self.args.include_tasks = kwargs['include_tasks'] if 'include_tasks' in kwargs else None self.endpoint = "abc.com" - self.benchmark_test_suite = BenchmarkTestSuite(endpoint=self.endpoint, security=False, distribution_version='2.3.0', args=self.args) + self.benchmark_test_suite = BenchmarkTestSuite(endpoint=self.endpoint, security=False, distribution_version='2.3.0', args=self.args, password=get_password('2.3.0')) def test_execute_default(self) -> None: with patch("subprocess.check_call") as mock_check_call: @@ -36,7 +37,7 @@ def test_execute_default(self) -> None: '--pipeline=benchmark-only --target-hosts=abc.com --client-options="timeout:300"') def test_execute_security_enabled_version_212_or_greater(self) -> None: - benchmark_test_suite = BenchmarkTestSuite(endpoint=self.endpoint, security=True, distribution_version='2.12.0', args=self.args) + benchmark_test_suite = BenchmarkTestSuite(endpoint=self.endpoint, security=True, distribution_version='2.12.0', args=self.args, password=get_password('2.12.0')) with patch("subprocess.check_call") as mock_check_call: benchmark_test_suite.execute() self.assertEqual(mock_check_call.call_count, 1) @@ -47,7 +48,7 @@ def test_execute_security_enabled_version_212_or_greater(self) -> None: 'verify_certs:false,basic_auth_user:\'admin\',basic_auth_password:\'myStrongPassword123!\'"') def test_execute_security_enabled(self) -> None: - benchmark_test_suite = BenchmarkTestSuite(endpoint=self.endpoint, security=True, distribution_version='2.3.0', args=self.args) + benchmark_test_suite = BenchmarkTestSuite(endpoint=self.endpoint, security=True, distribution_version='2.3.0', args=self.args, password=get_password('2.3.0')) with patch("subprocess.check_call") as mock_check_call: benchmark_test_suite.execute() self.assertEqual(mock_check_call.call_count, 1)