Skip to content

Commit

Permalink
Fix generate connecting to Kafka REST proxy (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted authored Dec 22, 2022
1 parent d1c7808 commit 5b33489
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 51 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
.DS_Store
.envrc
.vscode
pipelines/
defaults/
45 changes: 19 additions & 26 deletions kpops/component_handlers/topic/proxy_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from functools import cached_property

import requests

Expand All @@ -25,23 +26,30 @@ class ProxyWrapper:
"""

def __init__(self, pipeline_config: PipelineConfig) -> None:
"""
Default constructor. Sets the cluster ID by sending a requests to the rest proxy.
More information about the cluster ID can be found here:
https://docs.confluent.io/platform/current/kafka-rest/api.html#cluster-v3
"""
if not pipeline_config.kafka_rest_host:
raise ValueError(
"The Kafka Rest Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST."
"The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST."
)

# TODO: Initialize the cluster id at the beginning
self._cluster_id = self.__get_cluster_id(host=pipeline_config.kafka_rest_host)
self._host = pipeline_config.kafka_rest_host

@property
@cached_property
def cluster_id(self) -> str:
return self._cluster_id
"""
Gets the Kafka cluster ID by sending a requests to Kafka REST proxy.
More information about the cluster ID can be found here:
https://docs.confluent.io/platform/current/kafka-rest/api.html#cluster-v3
Currently both Kafka and Kafka REST Proxy are only aware of the Kafka cluster pointed at by the
bootstrap.servers configuration. Therefore, only one Kafka cluster will be returned.
:return: The Kafka cluster ID.
"""
response = requests.get(url=f"{self._host}/v3/clusters")
if response.status_code == requests.status_codes.codes.ok:
cluster_information = response.json()
return cluster_information["data"][0]["cluster_id"]

raise KafkaRestProxyError(response)

@property
def host(self) -> str:
Expand All @@ -54,7 +62,7 @@ def create_topic(self, topic_spec: TopicSpec) -> None:
:param topic_spec: The topic specification.
"""
response = requests.post(
url=f"{self._host}/v3/clusters/{self._cluster_id}/topics",
url=f"{self._host}/v3/clusters/{self.cluster_id}/topics",
headers=HEADERS,
json=topic_spec.dict(exclude_unset=True, exclude_none=True),
)
Expand Down Expand Up @@ -169,18 +177,3 @@ def get_broker_config(self) -> BrokerConfigResponse:
return BrokerConfigResponse(**response.json())

raise KafkaRestProxyError(response)

@classmethod
def __get_cluster_id(cls, host: str) -> str:
"""
Currently both Kafka and Kafka REST Proxy are only aware of the Kafka cluster pointed at by the
bootstrap.servers configuration. Therefore, only one Kafka cluster will be returned.
:param host:
:return:
"""
response = requests.get(url=f"{host}/v3/clusters")
if response.status_code == requests.status_codes.codes.ok:
cluster_information = response.json()
return cluster_information["data"][0]["cluster_id"]

raise KafkaRestProxyError(response)
17 changes: 9 additions & 8 deletions tests/component_handlers/topic/test_proxy_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ def log_debug_mock(self, mocker: MockerFixture) -> MagicMock:
@pytest.fixture(autouse=True)
@responses.activate
def setup(self):
with open(DEFAULTS_PATH / "kafka_rest_proxy_responses/cluster-info.json") as f:
config = PipelineConfig(
defaults_path=DEFAULTS_PATH, environment="development", kafka_rest_host=HOST
)
self.proxy_wrapper = ProxyWrapper(pipeline_config=config)

with open(
DEFAULTS_PATH / "kafka_rest_proxy_responses" / "cluster-info.json"
) as f:
cluster_response = json.load(f)

responses.add(
Expand All @@ -41,12 +48,6 @@ def setup(self):
json=cluster_response,
status=200,
)
config = PipelineConfig(
defaults_path=DEFAULTS_PATH, environment="development", kafka_rest_host=HOST
)
self.proxy_wrapper = ProxyWrapper(pipeline_config=config)

def test_should_get_cluster_id(self):
assert self.proxy_wrapper.host == HOST
assert self.proxy_wrapper.cluster_id == "cluster-1"

Expand All @@ -57,7 +58,7 @@ def test_should_raise_exception_when_host_is_not_set(self):
ProxyWrapper(pipeline_config=config)
assert (
str(exception.value)
== "The Kafka Rest Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST."
== "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST."
)

@patch("requests.post")
Expand Down
17 changes: 0 additions & 17 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import os
from pathlib import Path
from unittest.mock import MagicMock

import pytest
from pytest_mock import MockerFixture
from typer.testing import CliRunner

from kpops.cli.main import app
from kpops.component_handlers import ComponentHandlers
from kpops.utils.yaml_loading import load_yaml_file

runner = CliRunner()
Expand All @@ -16,19 +12,6 @@
PIPELINE_BASE_DIR = str(RESOURCE_PATH.parent)


@pytest.fixture(autouse=True)
def component_handlers(mocker: MockerFixture):
def mock_setup_handlers(*args, **kwargs) -> ComponentHandlers:
return ComponentHandlers(
schema_handler=None,
app_handler=MagicMock(),
connector_handler=MagicMock(),
topic_handler=MagicMock(),
)

mocker.patch("kpops.cli.main.setup_handlers", mock_setup_handlers)


class TestPipeline:
def test_load_pipeline(self, tmpdir, snapshot):
os.environ["KPOPS_ENVIRONMENT"] = "development"
Expand Down

0 comments on commit 5b33489

Please sign in to comment.