From 330ecab0a9fbee060f7bb0d9617582c72775dc04 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 7 Jul 2023 16:39:54 +0200 Subject: [PATCH] Plural broker field in pipeline config (#278) --- config.yaml | 2 +- .../resources/pipeline-components/kafka-app.yaml | 2 +- .../resources/pipeline-components/pipeline.yaml | 6 +++--- .../resources/pipeline-components/producer.yaml | 2 +- .../resources/pipeline-components/streams-app.yaml | 2 +- .../pipeline-defaults/defaults-kafka-app.yaml | 2 +- docs/docs/resources/pipeline-defaults/defaults.yaml | 2 +- docs/docs/resources/pipeline_config/config.yaml | 4 ++-- docs/docs/schema/config.json | 13 +++++++------ examples/bakdata/atm-fraud-detection/config.yaml | 2 +- examples/bakdata/atm-fraud-detection/defaults.yaml | 2 +- kpops/cli/pipeline_config.py | 7 ++++--- kpops/components/base_components/kafka_connector.py | 2 +- tests/components/test_kafka_sink_connector.py | 2 +- tests/components/test_kafka_source_connector.py | 2 +- tests/pipeline/resources/custom-config/config.yaml | 2 +- tests/pipeline/resources/defaults.yaml | 2 +- .../resources/kafka-connect-sink-config/config.yaml | 2 +- .../resources/no-topics-defaults/defaults.yaml | 2 +- .../defaults.yaml | 2 +- .../pipeline-with-env-defaults/defaults.yaml | 2 +- 21 files changed, 33 insertions(+), 31 deletions(-) diff --git a/config.yaml b/config.yaml index 3c408af2c..46d0cf8b3 100644 --- a/config.yaml +++ b/config.yaml @@ -1,5 +1,5 @@ environment: development -broker: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" +brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" kafka_connect_host: "http://localhost:8083" kafka_rest_host: "http://localhost:8082" schema_registry_url: "http://localhost:8081" diff --git a/docs/docs/resources/pipeline-components/kafka-app.yaml b/docs/docs/resources/pipeline-components/kafka-app.yaml index 3d08fd7f9..d2a94243d 100644 --- a/docs/docs/resources/pipeline-components/kafka-app.yaml +++ b/docs/docs/resources/pipeline-components/kafka-app.yaml @@ -7,7 +7,7 @@ # add the key-value pairs they need. app: # required streams: # required - brokers: ${broker} # required + brokers: ${brokers} # required schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # kafka-app-specific imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app diff --git a/docs/docs/resources/pipeline-components/pipeline.yaml b/docs/docs/resources/pipeline-components/pipeline.yaml index ed3973bcb..8dfd42708 100644 --- a/docs/docs/resources/pipeline-components/pipeline.yaml +++ b/docs/docs/resources/pipeline-components/pipeline.yaml @@ -50,7 +50,7 @@ # add the key-value pairs they need. app: # required streams: # required - brokers: ${broker} # required + brokers: ${brokers} # required schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # kafka-app-specific imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app @@ -96,7 +96,7 @@ app: # required # Streams Bootstrap streams section streams: # required, streams-app-specific - brokers: ${broker} # required + brokers: ${brokers} # required schemaRegistryUrl: ${schema_registry_url} inputTopics: - topic1 @@ -190,7 +190,7 @@ # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app app: # required streams: # required, producer-specific - brokers: ${broker} # required + brokers: ${brokers} # required schemaRegistryUrl: ${schema_registry_url} outputTopic: output_topic extraOutputTopics: diff --git a/docs/docs/resources/pipeline-components/producer.yaml b/docs/docs/resources/pipeline-components/producer.yaml index 47aa5888f..7b2df47bc 100644 --- a/docs/docs/resources/pipeline-components/producer.yaml +++ b/docs/docs/resources/pipeline-components/producer.yaml @@ -9,7 +9,7 @@ # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app app: # required streams: # required, producer-specific - brokers: ${broker} # required + brokers: ${brokers} # required schemaRegistryUrl: ${schema_registry_url} outputTopic: output_topic extraOutputTopics: diff --git a/docs/docs/resources/pipeline-components/streams-app.yaml b/docs/docs/resources/pipeline-components/streams-app.yaml index 2eb639fd2..ba1ef16c7 100644 --- a/docs/docs/resources/pipeline-components/streams-app.yaml +++ b/docs/docs/resources/pipeline-components/streams-app.yaml @@ -9,7 +9,7 @@ app: # required # Streams Bootstrap streams section streams: # required, streams-app-specific - brokers: ${broker} # required + brokers: ${brokers} # required schemaRegistryUrl: ${schema_registry_url} inputTopics: - topic1 diff --git a/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml b/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml index 3764c6990..a60fa24ad 100644 --- a/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml @@ -5,6 +5,6 @@ kafka-app: app: streams: - brokers: ${broker} + brokers: ${brokers} schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name diff --git a/docs/docs/resources/pipeline-defaults/defaults.yaml b/docs/docs/resources/pipeline-defaults/defaults.yaml index 01df7dbf7..d04b9444d 100644 --- a/docs/docs/resources/pipeline-defaults/defaults.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults.yaml @@ -52,7 +52,7 @@ kubernetes-app: kafka-app: app: streams: - brokers: ${broker} + brokers: ${brokers} schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # diff --git a/docs/docs/resources/pipeline_config/config.yaml b/docs/docs/resources/pipeline_config/config.yaml index ac278bce4..3b08c0708 100644 --- a/docs/docs/resources/pipeline_config/config.yaml +++ b/docs/docs/resources/pipeline_config/config.yaml @@ -8,9 +8,9 @@ defaults_path: . # pipeline_development.yaml for environment=development). # REQUIRED environment: development -# The Kafka broker address. +# The Kafka brokers address. # REQUIRED -broker: "http://localhost:9092" +brokers: "http://broker1:9092,http://broker2:9092" # The name of the defaults file and the prefix of the defaults environment file. defaults_filename_prefix: defaults # Configures topic names. diff --git a/docs/docs/schema/config.json b/docs/docs/schema/config.json index 8d5db2131..87d56f98a 100644 --- a/docs/docs/schema/config.json +++ b/docs/docs/schema/config.json @@ -39,13 +39,14 @@ "additionalProperties": false, "description": "Pipeline configuration unrelated to the components.", "properties": { - "broker": { - "description": "The Kafka broker address.", - "env": "KPOPS_KAFKA_BROKER", + "brokers": { + "description": "The comma separated Kafka brokers address.", + "env": "KPOPS_KAFKA_BROKERS", "env_names": [ - "kpops_kafka_broker" + "kpops_kafka_brokers" ], - "title": "Broker", + "example": "broker1:9092,broker2:9092,broker3:9092", + "title": "Brokers", "type": "string" }, "create_namespace": { @@ -187,7 +188,7 @@ }, "required": [ "environment", - "broker" + "brokers" ], "title": "PipelineConfig", "type": "object" diff --git a/examples/bakdata/atm-fraud-detection/config.yaml b/examples/bakdata/atm-fraud-detection/config.yaml index 8cbbebaf1..e3742ded9 100644 --- a/examples/bakdata/atm-fraud-detection/config.yaml +++ b/examples/bakdata/atm-fraud-detection/config.yaml @@ -4,7 +4,7 @@ topic_name_config: default_error_topic_name: "${pipeline_name}-${component_name}-dead-letter-topic" default_output_topic_name: "${pipeline_name}-${component_name}-topic" -broker: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" +brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" schema_registry_url: "http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081" diff --git a/examples/bakdata/atm-fraud-detection/defaults.yaml b/examples/bakdata/atm-fraud-detection/defaults.yaml index 67b9be982..cc1f07a25 100644 --- a/examples/bakdata/atm-fraud-detection/defaults.yaml +++ b/examples/bakdata/atm-fraud-detection/defaults.yaml @@ -10,7 +10,7 @@ kafka-connector: kafka-app: app: streams: - brokers: ${broker} + brokers: ${brokers} schemaRegistryUrl: ${schema_registry_url} optimizeLeaveGroupBehavior: false diff --git a/kpops/cli/pipeline_config.py b/kpops/cli/pipeline_config.py index d95021c8d..f65022f8f 100644 --- a/kpops/cli/pipeline_config.py +++ b/kpops/cli/pipeline_config.py @@ -38,10 +38,11 @@ class PipelineConfig(BaseSettings): description="The environment you want to generate and deploy the pipeline to. " "Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).", ) - broker: str = Field( + brokers: str = Field( default=..., - env=f"{ENV_PREFIX}KAFKA_BROKER", - description="The Kafka broker address.", + env=f"{ENV_PREFIX}KAFKA_BROKERS", + description="The comma separated Kafka brokers address.", + example="broker1:9092,broker2:9092,broker3:9092", ) defaults_filename_prefix: str = Field( default="defaults", diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index c524d6161..6f5bdeb7d 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -271,7 +271,7 @@ def _get_kafka_connect_resetter_values( **KafkaConnectResetterValues( config=KafkaConnectResetterConfig( connector=connector_name, - brokers=self.config.broker, + brokers=self.config.brokers, **kwargs, ), connector_type=connector_type.value, diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 00f6817a4..e95d6d3f0 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -48,7 +48,7 @@ def config(self) -> PipelineConfig: default_error_topic_name="${component_type}-error-topic", default_output_topic_name="${component_type}-output-topic", ), - broker="broker:9092", + brokers="broker:9092", helm_diff_config=HelmDiffConfig(), ) diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index b93477c4a..7f0c3779b 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -41,7 +41,7 @@ def config(slef) -> PipelineConfig: default_error_topic_name="${component_type}-error-topic", default_output_topic_name="${component_type}-output-topic", ), - broker="broker:9092", + brokers="broker:9092", helm_diff_config=HelmDiffConfig(), ) diff --git a/tests/pipeline/resources/custom-config/config.yaml b/tests/pipeline/resources/custom-config/config.yaml index c75372644..69e31d9fc 100644 --- a/tests/pipeline/resources/custom-config/config.yaml +++ b/tests/pipeline/resources/custom-config/config.yaml @@ -3,7 +3,7 @@ defaults_path: ../no-topics-defaults topic_name_config: default_error_topic_name: "${component_name}-dead-letter-topic" default_output_topic_name: "${component_name}-test-topic" -broker: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" +brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" kafka_connect_host: "http://localhost:8083" kafka_rest_host: "http://localhost:8082" schema_registry_url: "http://localhost:8081" diff --git a/tests/pipeline/resources/defaults.yaml b/tests/pipeline/resources/defaults.yaml index 1407fbf37..5946c24f2 100644 --- a/tests/pipeline/resources/defaults.yaml +++ b/tests/pipeline/resources/defaults.yaml @@ -5,7 +5,7 @@ kubernetes-app: kafka-app: app: streams: - brokers: "${broker}" + brokers: "${brokers}" schema_registry_url: "${schema_registry_url}" version: "2.4.2" diff --git a/tests/pipeline/resources/kafka-connect-sink-config/config.yaml b/tests/pipeline/resources/kafka-connect-sink-config/config.yaml index bfd84b1ea..6b7c754ab 100644 --- a/tests/pipeline/resources/kafka-connect-sink-config/config.yaml +++ b/tests/pipeline/resources/kafka-connect-sink-config/config.yaml @@ -1,6 +1,6 @@ environment: development defaults_path: .. -broker: "broker:9092" +brokers: "broker:9092" topic_name_config: default_error_topic_name: ${component_type}-error-topic default_output_topic_name: ${component_type}-output-topic diff --git a/tests/pipeline/resources/no-topics-defaults/defaults.yaml b/tests/pipeline/resources/no-topics-defaults/defaults.yaml index 2e4ac9924..031e85936 100644 --- a/tests/pipeline/resources/no-topics-defaults/defaults.yaml +++ b/tests/pipeline/resources/no-topics-defaults/defaults.yaml @@ -1,7 +1,7 @@ kafka-app: app: streams: - brokers: "${broker}" + brokers: "${brokers}" schemaRegistryUrl: "${schema_registry_url}" producer: diff --git a/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml b/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml index 5bb80bf36..dfbe23db9 100644 --- a/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml +++ b/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml @@ -7,5 +7,5 @@ kubernetes-app: kafka-app: app: streams: - brokers: ${broker} + brokers: ${brokers} schemaRegistryUrl: ${schema_registry_url} diff --git a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml index 13a6b153d..5de1d8be7 100644 --- a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml +++ b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml @@ -4,7 +4,7 @@ kubernetes-app: kafka-app: app: streams: - brokers: "${broker}" + brokers: "${brokers}" schemaRegistryUrl: "${schema_registry_url}" producer: {} # inherits from kafka