From 5b33489174457656c36a387bb6c9b09746c00228 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Thu, 22 Dec 2022 14:08:30 +0100 Subject: [PATCH] Fix generate connecting to Kafka REST proxy (#41) --- .gitignore | 2 + .../component_handlers/topic/proxy_wrapper.py | 45 ++++++++----------- .../topic/test_proxy_wrapper.py | 17 +++---- tests/pipeline/test_pipeline.py | 17 ------- 4 files changed, 30 insertions(+), 51 deletions(-) diff --git a/.gitignore b/.gitignore index de2bac001..ddfed1335 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ .DS_Store .envrc .vscode +pipelines/ +defaults/ diff --git a/kpops/component_handlers/topic/proxy_wrapper.py b/kpops/component_handlers/topic/proxy_wrapper.py index a93e02d7e..f58f618a1 100644 --- a/kpops/component_handlers/topic/proxy_wrapper.py +++ b/kpops/component_handlers/topic/proxy_wrapper.py @@ -1,4 +1,5 @@ import logging +from functools import cached_property import requests @@ -25,23 +26,30 @@ class ProxyWrapper: """ def __init__(self, pipeline_config: PipelineConfig) -> None: - """ - Default constructor. Sets the cluster ID by sending a requests to the rest proxy. - More information about the cluster ID can be found here: - https://docs.confluent.io/platform/current/kafka-rest/api.html#cluster-v3 - """ if not pipeline_config.kafka_rest_host: raise ValueError( - "The Kafka Rest Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." + "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." ) - # TODO: Initialize the cluster id at the beginning - self._cluster_id = self.__get_cluster_id(host=pipeline_config.kafka_rest_host) self._host = pipeline_config.kafka_rest_host - @property + @cached_property def cluster_id(self) -> str: - return self._cluster_id + """ + Gets the Kafka cluster ID by sending a requests to Kafka REST proxy. + More information about the cluster ID can be found here: + https://docs.confluent.io/platform/current/kafka-rest/api.html#cluster-v3 + + Currently both Kafka and Kafka REST Proxy are only aware of the Kafka cluster pointed at by the + bootstrap.servers configuration. Therefore, only one Kafka cluster will be returned. + :return: The Kafka cluster ID. + """ + response = requests.get(url=f"{self._host}/v3/clusters") + if response.status_code == requests.status_codes.codes.ok: + cluster_information = response.json() + return cluster_information["data"][0]["cluster_id"] + + raise KafkaRestProxyError(response) @property def host(self) -> str: @@ -54,7 +62,7 @@ def create_topic(self, topic_spec: TopicSpec) -> None: :param topic_spec: The topic specification. """ response = requests.post( - url=f"{self._host}/v3/clusters/{self._cluster_id}/topics", + url=f"{self._host}/v3/clusters/{self.cluster_id}/topics", headers=HEADERS, json=topic_spec.dict(exclude_unset=True, exclude_none=True), ) @@ -169,18 +177,3 @@ def get_broker_config(self) -> BrokerConfigResponse: return BrokerConfigResponse(**response.json()) raise KafkaRestProxyError(response) - - @classmethod - def __get_cluster_id(cls, host: str) -> str: - """ - Currently both Kafka and Kafka REST Proxy are only aware of the Kafka cluster pointed at by the - bootstrap.servers configuration. Therefore, only one Kafka cluster will be returned. - :param host: - :return: - """ - response = requests.get(url=f"{host}/v3/clusters") - if response.status_code == requests.status_codes.codes.ok: - cluster_information = response.json() - return cluster_information["data"][0]["cluster_id"] - - raise KafkaRestProxyError(response) diff --git a/tests/component_handlers/topic/test_proxy_wrapper.py b/tests/component_handlers/topic/test_proxy_wrapper.py index fb1d7b659..b3f823a36 100644 --- a/tests/component_handlers/topic/test_proxy_wrapper.py +++ b/tests/component_handlers/topic/test_proxy_wrapper.py @@ -32,7 +32,14 @@ def log_debug_mock(self, mocker: MockerFixture) -> MagicMock: @pytest.fixture(autouse=True) @responses.activate def setup(self): - with open(DEFAULTS_PATH / "kafka_rest_proxy_responses/cluster-info.json") as f: + config = PipelineConfig( + defaults_path=DEFAULTS_PATH, environment="development", kafka_rest_host=HOST + ) + self.proxy_wrapper = ProxyWrapper(pipeline_config=config) + + with open( + DEFAULTS_PATH / "kafka_rest_proxy_responses" / "cluster-info.json" + ) as f: cluster_response = json.load(f) responses.add( @@ -41,12 +48,6 @@ def setup(self): json=cluster_response, status=200, ) - config = PipelineConfig( - defaults_path=DEFAULTS_PATH, environment="development", kafka_rest_host=HOST - ) - self.proxy_wrapper = ProxyWrapper(pipeline_config=config) - - def test_should_get_cluster_id(self): assert self.proxy_wrapper.host == HOST assert self.proxy_wrapper.cluster_id == "cluster-1" @@ -57,7 +58,7 @@ def test_should_raise_exception_when_host_is_not_set(self): ProxyWrapper(pipeline_config=config) assert ( str(exception.value) - == "The Kafka Rest Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." + == "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." ) @patch("requests.post") diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index b6e8ef3c4..7eeeb9ac7 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1,13 +1,9 @@ import os from pathlib import Path -from unittest.mock import MagicMock -import pytest -from pytest_mock import MockerFixture from typer.testing import CliRunner from kpops.cli.main import app -from kpops.component_handlers import ComponentHandlers from kpops.utils.yaml_loading import load_yaml_file runner = CliRunner() @@ -16,19 +12,6 @@ PIPELINE_BASE_DIR = str(RESOURCE_PATH.parent) -@pytest.fixture(autouse=True) -def component_handlers(mocker: MockerFixture): - def mock_setup_handlers(*args, **kwargs) -> ComponentHandlers: - return ComponentHandlers( - schema_handler=None, - app_handler=MagicMock(), - connector_handler=MagicMock(), - topic_handler=MagicMock(), - ) - - mocker.patch("kpops.cli.main.setup_handlers", mock_setup_handlers) - - class TestPipeline: def test_load_pipeline(self, tmpdir, snapshot): os.environ["KPOPS_ENVIRONMENT"] = "development"