Skip to content

Commit

Permalink
Fix missing extra input topics (#230)
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted authored Jun 7, 2023
1 parent 4ee112d commit da2b0e4
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 161 deletions.
4 changes: 2 additions & 2 deletions docs/docs/schema/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
"type": "object"
},
"InputTopicTypes": {
"description": "Input topic types\n\n input (input topic), input_pattern (input pattern topic), extra (extra topic), extra_pattern (extra pattern topic).\n Every extra topic must have a role.\n ",
"description": "Input topic types\n\ninput (input topic), input_pattern (input pattern topic), extra (extra topic), extra_pattern (extra pattern topic).\nEvery extra topic must have a role.",
"enum": [
"input",
"extra",
Expand Down Expand Up @@ -641,7 +641,7 @@
"type": "object"
},
"OutputTopicTypes": {
"description": "Types of output topic\n\n Error (error topic), output (output topic), and extra topics. Every extra topic must have a role.\n ",
"description": "Types of output topic\n\nError (error topic), output (output topic), and extra topics. Every extra topic must have a role.",
"enum": [
"error",
"output",
Expand Down
2 changes: 2 additions & 0 deletions examples/bakdata/atm-fraud-detection/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ streams-app:
app:
labels:
pipeline: ${pipeline_name}
streams:
optimizeLeaveGroupBehavior: false
to:
topics:
${error_topic_name}:
Expand Down
1 change: 0 additions & 1 deletion examples/bakdata/atm-fraud-detection/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
- type: kafka-sink-connector
name: postgresql-connector
app:
name: postgresql-connector
connector.class: io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max: 1
topics: ${pipeline_name}-account-linker-topic
Expand Down
6 changes: 3 additions & 3 deletions kpops/component_handlers/kafka_connect/connect_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def create_connector(
:param kafka_connect_config: The config of the connector
:return: The current connector info if successful
"""
config_json = kafka_connect_config.dict(exclude_none=True, exclude_unset=True)
config_json = kafka_connect_config.dict(exclude_none=True)
connect_data = {"name": connector_name, "config": config_json}
response = requests.post(
url=f"{self._host}/connectors", headers=HEADERS, json=connect_data
Expand Down Expand Up @@ -99,7 +99,7 @@ def update_connector_config(
:param kafka_connect_config: Configuration parameters for the connector.
:return: Information about the connector after the change has been made.
"""
config_json = kafka_connect_config.dict(exclude_none=True, exclude_unset=True)
config_json = kafka_connect_config.dict(exclude_none=True)
response = requests.put(
url=f"{self._host}/connectors/{connector_name}/config",
headers=HEADERS,
Expand All @@ -126,7 +126,7 @@ def update_connector_config(
def get_connector_config(
cls, connector_name: str, config: KafkaConnectConfig
) -> dict[str, Any]:
connector_config = config.dict(exclude_none=True, exclude_unset=True)
connector_config = config.dict(exclude_none=True)
if (name := connector_config.get("name")) and name != connector_name:
raise ValueError("Connector name should be the same as component name")
connector_config["name"] = connector_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def __dry_run_connector_creation(
):
log.info(f"Updating config:\n{diff}")

log.debug(kafka_connect_config.dict(exclude_unset=True, exclude_none=True))
log.debug(kafka_connect_config.dict(exclude_none=True))
log.debug(f"PUT /connectors/{connector_name}/config HTTP/1.1")
log.debug(f"HOST: {self._connect_wrapper.host}")
except ConnectorNotFoundException:
Expand Down
1 change: 0 additions & 1 deletion kpops/component_handlers/topic/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ def __prepare_body(cls, topic_name: str, topic_config: TopicConfig) -> TopicSpec
"replication_factor": True,
"configs": True,
},
exclude_unset=True,
exclude_none=True,
)
configs = []
Expand Down
2 changes: 1 addition & 1 deletion kpops/component_handlers/topic/proxy_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def create_topic(self, topic_spec: TopicSpec) -> None:
response = requests.post(
url=f"{self._host}/v3/clusters/{self.cluster_id}/topics",
headers=HEADERS,
json=topic_spec.dict(exclude_unset=True, exclude_none=True),
json=topic_spec.dict(exclude_none=True),
)
if response.status_code == requests.status_codes.codes.created:
log.info(f"Topic {topic_spec.topic_name} created.")
Expand Down
2 changes: 1 addition & 1 deletion kpops/components/base_components/kubernetes_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def to_helm_values(self) -> dict:
:returns: Thte values to be used by Helm
:rtype: dict
"""
return self.app.dict(by_alias=True, exclude_none=True, exclude_unset=True)
return self.app.dict(by_alias=True, exclude_none=True, exclude_defaults=True)

def print_helm_diff(self, stdout: str) -> None:
"""Print the diff of the last and current release of this component
Expand Down
Loading

0 comments on commit da2b0e4

Please sign in to comment.