Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mechanism to run benchmark tests against a given cluster endpoint #4541

Merged
merged 2 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/test_workflow/benchmark_test/benchmark_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# Contains the arguments required to run a perf test.
class BenchmarkArgs:
bundle_manifest: IO
cluster_endpoint: str
distribution_url: str
distribution_version: str
stack_suffix: str
Expand Down Expand Up @@ -54,12 +55,14 @@ def __init__(self) -> None:
parser = argparse.ArgumentParser(description="Test an OpenSearch Bundle")
parser.add_argument("--bundle-manifest", type=argparse.FileType("r"), help="Bundle Manifest file.")
parser.add_argument("--distribution-url", dest="distribution_url", help="Link to a downloadable OpenSearch tarball.")
parser.add_argument("--cluster-endpoint", dest="cluster_endpoint",
help="Load balancer url for benchmark testing")
parser.add_argument("--distribution-version", dest="distribution_version",
help="provide OpenSearch version if using distribution-url param.")
parser.add_argument("--suffix", dest="suffix", help="Suffix to be added to stack name for performance test")
parser.add_argument("--component", dest="component", default="OpenSearch",
help="Component name that needs to be performance tested")
parser.add_argument("--config", type=argparse.FileType("r"), help="Config file.", required=True)
parser.add_argument("--config", type=argparse.FileType("r"), help="Config file.")
parser.add_argument("--without-security", dest="insecure", action="store_true",
help="Force the security of the cluster to be disabled.", default=False)
parser.add_argument("--keep", dest="keep", action="store_true",
Expand Down Expand Up @@ -121,6 +124,7 @@ def __init__(self) -> None:
args = parser.parse_args()
self.bundle_manifest = args.bundle_manifest if args.bundle_manifest else None
self.distribution_url = args.distribution_url if args.distribution_url else None
self.cluster_endpoint = args.cluster_endpoint if args.cluster_endpoint else None
self.distribution_version = args.distribution_version if args.distribution_version else None
self.stack_suffix = args.suffix if args.suffix else None
self.config = args.config
Expand Down Expand Up @@ -152,7 +156,7 @@ def __init__(self) -> None:
self.telemetry_params = args.telemetry_params if args.telemetry_params else None
self.logging_level = args.logging_level

if self.bundle_manifest is None and self.distribution_url is None:
raise Exception('Please provide either --bundle-manifest or --distribution-url to run the performance test.')
if self.bundle_manifest is None and self.distribution_url is None and self.cluster_endpoint is None:
raise Exception('Please provide either --bundle-manifest or --distribution-url or --cluster_endpoint to run the performance test.')
elif self.distribution_url and self.distribution_version is None:
raise Exception("--distribution-version is required parameter while using --distribution-url param.")
156 changes: 156 additions & 0 deletions src/test_workflow/benchmark_test/benchmark_create_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.


import json
import logging
import os
import subprocess
from contextlib import contextmanager
from typing import Any, Generator, Union

from manifests.build_manifest import BuildManifest
from manifests.bundle_manifest import BundleManifest
from test_workflow.benchmark_test.benchmark_args import BenchmarkArgs
from test_workflow.benchmark_test.benchmark_test_cluster import BenchmarkTestCluster
from test_workflow.integ_test.utils import get_password


class BenchmarkCreateCluster(BenchmarkTestCluster):
manifest: Union[BundleManifest, BuildManifest]
work_dir: str
current_workspace: str
output_file: str
params: str
is_endpoint_public: bool
cluster_endpoint: str

"""
Represents a performance test cluster. This class deploys the opensearch bundle with CDK. Supports both single
and multi-node clusters
"""

def __init__(
self,
args: BenchmarkArgs,
bundle_manifest: Union[BundleManifest, BuildManifest],
config: dict,
current_workspace: str
) -> None:
super().__init__(args)
self.manifest = bundle_manifest
self.current_workspace = current_workspace
self.output_file = "output.json"
role = config["Constants"]["Role"]
params_dict = self.setup_cdk_params(config)
params_list = []
for key, value in params_dict.items():
if value:
'''
TODO: To send json input to typescript code from command line it needs to be enclosed in
single-quotes, this is a temp fix to achieve that since the quoted string passed from command line in
tesh.sh wrapper script gets un-quoted and we need to handle it here.
'''
if key == 'additionalConfig':
params_list.append(f" -c {key}=\'{value}\'")
else:
params_list.append(f" -c {key}={value}")
role_params = (
f" --require-approval=never --plugin cdk-assume-role-credential-plugin"
f" -c assume-role-credentials:writeIamRoleName={role} -c assume-role-credentials:readIamRoleName={role} "
)
self.params = "".join(params_list) + role_params
self.is_endpoint_public = False
self.stack_name = f"opensearch-infra-stack-{self.args.stack_suffix}"
if self.manifest:
self.stack_name += f"-{self.manifest.build.id}-{self.manifest.build.architecture}"

def start(self) -> None:
command = f"npm install && cdk deploy \"*\" {self.params} --outputs-file {self.output_file}"

logging.info(f'Executing "{command}" in {os.getcwd()}')
subprocess.check_call(command, cwd=os.getcwd(), shell=True)
with open(self.output_file, "r") as read_file:
load_output = json.load(read_file)
self.create_endpoint(load_output)
self.wait_for_processing()
logging.info("Wait for processing executed")

def create_endpoint(self, cdk_output: dict) -> None:
load_balancer_url = cdk_output[self.stack_name].get('loadbalancerurl', None)
if load_balancer_url is None:
raise RuntimeError("Unable to fetch the cluster endpoint from cdk output")

Check warning on line 86 in src/test_workflow/benchmark_test/benchmark_create_cluster.py

View check run for this annotation

Codecov / codecov/patch

src/test_workflow/benchmark_test/benchmark_create_cluster.py#L86

Added line #L86 was not covered by tests
self.cluster_endpoint = load_balancer_url
self.cluster_endpoint_with_port = "".join([load_balancer_url, ":", str(self.port)])

def terminate(self) -> None:
command = f"cdk destroy {self.stack_name} {self.params} --force"
logging.info(f'Executing "{command}" in {os.getcwd()}')

subprocess.check_call(command, cwd=os.getcwd(), shell=True)

def setup_cdk_params(self, config: dict) -> dict:
suffix = ''
if self.args.stack_suffix and self.manifest:
suffix = self.args.stack_suffix + '-' + self.manifest.build.id + '-' + self.manifest.build.architecture
elif self.manifest:
suffix = self.manifest.build.id + '-' + self.manifest.build.architecture

Check warning on line 101 in src/test_workflow/benchmark_test/benchmark_create_cluster.py

View check run for this annotation

Codecov / codecov/patch

src/test_workflow/benchmark_test/benchmark_create_cluster.py#L101

Added line #L101 was not covered by tests
elif self.args.stack_suffix:
suffix = self.args.stack_suffix

if self.manifest:
self.args.distribution_version = self.manifest.build.version
artifact_url = self.manifest.build.location if isinstance(self.manifest, BundleManifest) else \
f"https://artifacts.opensearch.org/snapshots/core/opensearch/{self.manifest.build.version}/opensearch-min-" \
f"{self.manifest.build.version}-linux-{self.manifest.build.architecture}-latest.tar.gz"
else:
artifact_url = self.args.distribution_url.strip()

return {
"distributionUrl": artifact_url,
"vpcId": config["Constants"]["VpcId"],
"account": config["Constants"]["AccountId"],
"region": config["Constants"]["Region"],
"suffix": suffix,
"securityDisabled": str(self.args.insecure).lower(),
"adminPassword": None if self.args.insecure else get_password(self.args.distribution_version),
"cpuArch": self.manifest.build.architecture if self.manifest else 'x64',
"singleNodeCluster": str(self.args.single_node).lower(),
"distVersion": self.args.distribution_version,
"minDistribution": str(self.args.min_distribution).lower(),
"serverAccessType": config["Constants"]["serverAccessType"],
"restrictServerAccessTo": config["Constants"]["restrictServerAccessTo"],
"additionalConfig": self.args.additional_config,
"dataInstanceType": self.args.data_instance_type,
"managerNodeCount": self.args.manager_node_count,
"dataNodeCount": self.args.data_node_count,
"clientNodeCount": self.args.client_node_count,
"ingestNodeCount": self.args.ingest_node_count,
"mlNodeCount": self.args.ml_node_count,
"dataNodeStorage": self.args.data_node_storage,
"mlNodeStorage": self.args.ml_node_storage,
"jvmSysProps": self.args.jvm_sys_props,
"use50PercentHeap": str(self.args.use_50_percent_heap).lower(),
"isInternal": config["Constants"]["isInternal"],
"enableRemoteStore": str(self.args.enable_remote_store).lower(),
"customRoleArn": config["Constants"]["IamRoleArn"]
}

@classmethod
@contextmanager
def create(cls, *args: Any) -> Generator[Any, None, None]:
"""
Set up the cluster. When this method returns, the cluster must be available to take requests.
Throws ClusterCreationException if the cluster could not start for some reason. If this exception is thrown, the caller does not need to call "destroy".
"""
cluster = cls(*args)

Check warning on line 150 in src/test_workflow/benchmark_test/benchmark_create_cluster.py

View check run for this annotation

Codecov / codecov/patch

src/test_workflow/benchmark_test/benchmark_create_cluster.py#L150

Added line #L150 was not covered by tests

try:
cluster.start()
yield cluster

Check warning on line 154 in src/test_workflow/benchmark_test/benchmark_create_cluster.py

View check run for this annotation

Codecov / codecov/patch

src/test_workflow/benchmark_test/benchmark_create_cluster.py#L152-L154

Added lines #L152 - L154 were not covered by tests
finally:
cluster.terminate()

Check warning on line 156 in src/test_workflow/benchmark_test/benchmark_create_cluster.py

View check run for this annotation

Codecov / codecov/patch

src/test_workflow/benchmark_test/benchmark_create_cluster.py#L156

Added line #L156 was not covered by tests
Loading
Loading