From 3865e1344c4358b4c25f044fbb0d80f267453536 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 09:18:32 +0100 Subject: [PATCH 01/12] Add --application-id parameter --- azure-pipelines.yml | 2 ++ build.gradle.kts | 4 +-- .../templates/job.yaml | 4 +++ charts/streams-app-cleanup-job/values.yaml | 4 +++ charts/streams-app/templates/deployment.yaml | 4 +++ charts/streams-app/values.yaml | 1 + .../kafka/KafkaStreamsApplication.java | 29 ++++++++++++++++--- .../com/bakdata/kafka/CleanUpRunnerTest.java | 4 +-- .../kafka/integration/StreamsCleanUpTest.java | 24 +++++++-------- .../kafka/util/TopologyInformationTest.java | 6 ++-- 10 files changed, 59 insertions(+), 23 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 989872b1..fc9eca7b 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -23,6 +23,8 @@ resources: jobs: - template: azure/gradle/build.yml@templates + parameters: + jdkVersion: '1.17' - template: azure/gradle/create_tag_version.yml@templates - template: azure/gradle/upload_release.yml@templates - template: azure/gradle/upload_snapshot.yml@templates diff --git a/build.gradle.kts b/build.gradle.kts index 2e6f7933..2184f083 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,7 +1,7 @@ plugins { id("net.researchgate.release") version "3.0.2" - id("com.bakdata.sonar") version "1.1.7" - id("com.bakdata.sonatype") version "1.1.7" + id("com.bakdata.sonar") version "1.1.9" + id("com.bakdata.sonatype") version "1.1.9" id("org.hildan.github.changelog") version "1.12.1" id("io.freefair.lombok") version "6.6.1" } diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index d074490a..eaee0047 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -112,6 +112,10 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS" value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} + {{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }} + - name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID" + value: {{ .Values.autoscaling.consumerGroup | quote }} + {{- end }} {{- range $key, $value := .Values.secrets }} - name: "{{ $key }}" valueFrom: diff --git a/charts/streams-app-cleanup-job/values.yaml b/charts/streams-app-cleanup-job/values.yaml index 9fa76774..cb091e05 100644 --- a/charts/streams-app-cleanup-job/values.yaml +++ b/charts/streams-app-cleanup-job/values.yaml @@ -15,6 +15,7 @@ configurationEnvPrefix: "APP" streams: # brokers: "test:9092" # schemaRegistryUrl: "url:1234" + passApplicationId: false config: {} # max.poll.records: 500 # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. @@ -59,6 +60,9 @@ labels: {} # serviceAccountName: foo +autoscaling: {} + # consumerGroup: foo + tolerations: [] # - key: "foo" # operator: "Exists" diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index cd900fa2..9f357aad 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -175,6 +175,10 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS" value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} + {{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }} + - name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID" + value: {{ .Values.autoscaling.consumerGroup | quote }} + {{- end }} {{- range $key, $value := .Values.secrets }} - name: "{{ $key }}" valueFrom: diff --git a/charts/streams-app/values.yaml b/charts/streams-app/values.yaml index 0309adb6..479bb98b 100644 --- a/charts/streams-app/values.yaml +++ b/charts/streams-app/values.yaml @@ -27,6 +27,7 @@ streams: # schemaRegistryUrl: "url:1234" staticMembership: false optimizeLeaveGroupBehavior: true + passApplicationId: false config: {} # max.poll.records: 500 # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 33427103..4e66b77e 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -94,6 +94,8 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement @CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1", description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.") private boolean volatileGroupInstanceId; + @CommandLine.Option(names = "--application-id", description = "Application ID to use for Kafka Streams") + private String applicationId; private KafkaStreams streams; private Throwable lastException; @@ -159,7 +161,9 @@ public void close() { * This must be set to a unique value for every application interacting with your kafka cluster to ensure internal * state encapsulation. Could be set to: className-inputTopic-outputTopic */ - public abstract String getUniqueAppId(); + public String getUniqueAppId() { + return null; + } /** * Create the topology of the Kafka Streams app @@ -235,6 +239,23 @@ protected StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() { return new DefaultStreamsUncaughtExceptionHandler(); } + public final String getStreamsApplicationId() { + final String uniqueAppId = this.getUniqueAppId(); + if(uniqueAppId == null) { + if(this.applicationId == null) { + throw new IllegalArgumentException("Must pass --application-id or implement #getUniqueAppId()"); + } + return this.applicationId; + } + if(this.applicationId == null) { + return uniqueAppId; + } + if(!uniqueAppId.equals(this.applicationId)) { + throw new IllegalArgumentException("Application ID provided via --application-id does not match #getUniqueAppId()"); + } + return uniqueAppId; + } + /** *

This method should give a default configuration to run your streaming application with.

* If {@link KafkaApplication#schemaRegistryUrl} is set {@link SpecificAvroSerde} is set as the default key, value @@ -271,7 +292,7 @@ protected Properties createKafkaProperties() { kafkaConfig.setProperty(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip"); // topology - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId()); + kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getApplicationId()); this.configureDefaultSerde(kafkaConfig); kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBrokers()); @@ -322,7 +343,7 @@ protected void runCleanUp() { try (final ImprovedAdminClient adminClient = this.createAdminClient()) { final CleanUpRunner cleanUpRunner = CleanUpRunner.builder() .topology(this.createTopology()) - .appId(this.getUniqueAppId()) + .appId(this.getApplicationId()) .adminClient(adminClient) .streams(this.streams) .build(); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java index 667028a7..1553cc3e 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -40,7 +40,7 @@ class CleanUpRunnerTest { void createTemporaryPropertiesFile() throws IOException { final WordCount wordCount = new WordCount(); wordCount.setInputTopics(List.of("input")); - final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getUniqueAppId(), + final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getStreamsApplicationId(), wordCount.getKafkaProperties()); assertThat(file).exists(); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 8e21f9eb..03be66ee 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -172,7 +172,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group exists") - .contains(this.app.getUniqueAppId()); + .contains(this.app.getStreamsApplicationId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error retrieving consumer groups", e); } @@ -184,7 +184,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group is deleted") - .doesNotContain(this.app.getUniqueAppId()); + .doesNotContain(this.app.getStreamsApplicationId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error retrieving consumer groups", e); } @@ -211,7 +211,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group exists") - .contains(this.app.getUniqueAppId()); + .contains(this.app.getStreamsApplicationId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error retrieving consumer groups", e); } @@ -219,12 +219,12 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); try (final AdminClient adminClient = AdminClient.create(this.app.getKafkaProperties())) { - adminClient.deleteConsumerGroups(List.of(this.app.getUniqueAppId())).all() + adminClient.deleteConsumerGroups(List.of(this.app.getStreamsApplicationId())).all() .get(TIMEOUT_SECONDS, TimeUnit.SECONDS); this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group is deleted") - .doesNotContain(this.app.getUniqueAppId()); + .doesNotContain(this.app.getStreamsApplicationId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error deleting consumer group", e); } @@ -237,9 +237,9 @@ void shouldDeleteInternalTopics() throws InterruptedException { final String inputTopic = this.app.getInputTopic(); final String internalTopic = - this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"; + this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"; final String backingTopic = - this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"; + this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"; final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); @@ -410,9 +410,9 @@ void shouldDeleteSchemaOfInternalTopics() final String inputSubject = this.app.getInputTopic() + "-value"; final String internalSubject = - this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value"; + this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value"; final String backingSubject = - this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value"; + this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value"; final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value"; final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); @@ -469,7 +469,7 @@ void shouldCallCleanupHookForInternalTopics() { this.app = this.createComplexCleanUpHookApplication(); this.runCleanUp(); - final String uniqueAppId = this.app.getUniqueAppId(); + final String uniqueAppId = this.app.getStreamsApplicationId(); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"); @@ -481,7 +481,7 @@ void shouldCallCleanUpHookForAllTopics() { this.app = this.createComplexCleanUpHookApplication(); this.runCleanUpWithDeletion(); - final String uniqueAppId = this.app.getUniqueAppId(); + final String uniqueAppId = this.app.getStreamsApplicationId(); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java index 6ea870a3..1fc24a5a 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -49,7 +49,7 @@ void setup() { this.app = new ComplexTopologyApplication(); this.app.setInputTopics(List.of("input", "input2")); this.app.setOutputTopic("output"); - this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getUniqueAppId()); + this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getStreamsApplicationId()); } @Test @@ -131,7 +131,7 @@ void shouldNotReturnInputTopics() { void shouldReturnAllInternalTopics() { assertThat(this.topologyInformation.getInternalTopics()) .hasSize(3) - .allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getUniqueAppId()) + .allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getStreamsApplicationId()) || topic.startsWith("KSTREAM-")) .allMatch(topic -> topic.endsWith("-changelog") || topic.endsWith("-repartition")); } From eafd69e994d457c595addb6187b81e8e77b4efc2 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 09:21:26 +0100 Subject: [PATCH 02/12] Add --application-id parameter --- charts/streams-app-cleanup-job/templates/job.yaml | 2 +- charts/streams-app-cleanup-job/values.yaml | 1 - charts/streams-app/templates/deployment.yaml | 2 +- charts/streams-app/values.yaml | 1 - 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index eaee0047..60b885e9 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -112,7 +112,7 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS" value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} - {{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }} + {{- if hasKey .Values.autoscaling "consumerGroup" }} - name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID" value: {{ .Values.autoscaling.consumerGroup | quote }} {{- end }} diff --git a/charts/streams-app-cleanup-job/values.yaml b/charts/streams-app-cleanup-job/values.yaml index cb091e05..b26f3ec6 100644 --- a/charts/streams-app-cleanup-job/values.yaml +++ b/charts/streams-app-cleanup-job/values.yaml @@ -15,7 +15,6 @@ configurationEnvPrefix: "APP" streams: # brokers: "test:9092" # schemaRegistryUrl: "url:1234" - passApplicationId: false config: {} # max.poll.records: 500 # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index 9f357aad..383f4b09 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -175,7 +175,7 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS" value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} - {{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }} + {{- if hasKey .Values.autoscaling "consumerGroup" }} - name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID" value: {{ .Values.autoscaling.consumerGroup | quote }} {{- end }} diff --git a/charts/streams-app/values.yaml b/charts/streams-app/values.yaml index 479bb98b..0309adb6 100644 --- a/charts/streams-app/values.yaml +++ b/charts/streams-app/values.yaml @@ -27,7 +27,6 @@ streams: # schemaRegistryUrl: "url:1234" staticMembership: false optimizeLeaveGroupBehavior: true - passApplicationId: false config: {} # max.poll.records: 500 # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. From 5fab21bca849a7785b295f34f2912bef3dfd6d11 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 09:24:01 +0100 Subject: [PATCH 03/12] Add --application-id parameter --- .../com/bakdata/kafka/KafkaStreamsApplication.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 4e66b77e..0373dda3 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -241,17 +241,18 @@ protected StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() { public final String getStreamsApplicationId() { final String uniqueAppId = this.getUniqueAppId(); - if(uniqueAppId == null) { - if(this.applicationId == null) { + if (uniqueAppId == null) { + if (this.applicationId == null) { throw new IllegalArgumentException("Must pass --application-id or implement #getUniqueAppId()"); } return this.applicationId; } - if(this.applicationId == null) { + if (this.applicationId == null) { return uniqueAppId; } - if(!uniqueAppId.equals(this.applicationId)) { - throw new IllegalArgumentException("Application ID provided via --application-id does not match #getUniqueAppId()"); + if (!uniqueAppId.equals(this.applicationId)) { + throw new IllegalArgumentException( + "Application ID provided via --application-id does not match #getUniqueAppId()"); } return uniqueAppId; } From 580fbb747afcdd9a4532ded14fa5f8c3d5af34b1 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 09:35:10 +0100 Subject: [PATCH 04/12] Add --application-id parameter --- .../kafka/KafkaStreamsApplication.java | 46 ++++++++----------- .../com/bakdata/kafka/CleanUpRunnerTest.java | 2 +- .../kafka/integration/StreamsCleanUpTest.java | 22 ++++----- .../kafka/util/TopologyInformationTest.java | 4 +- 4 files changed, 33 insertions(+), 41 deletions(-) diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 0373dda3..3049afeb 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -95,7 +95,7 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.") private boolean volatileGroupInstanceId; @CommandLine.Option(names = "--application-id", description = "Application ID to use for Kafka Streams") - private String applicationId; + private String uniqueAppId; private KafkaStreams streams; private Throwable lastException; @@ -157,14 +157,6 @@ public void close() { */ public abstract void buildTopology(StreamsBuilder builder); - /** - * This must be set to a unique value for every application interacting with your kafka cluster to ensure internal - * state encapsulation. Could be set to: className-inputTopic-outputTopic - */ - public String getUniqueAppId() { - return null; - } - /** * Create the topology of the Kafka Streams app * @@ -239,24 +231,6 @@ protected StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() { return new DefaultStreamsUncaughtExceptionHandler(); } - public final String getStreamsApplicationId() { - final String uniqueAppId = this.getUniqueAppId(); - if (uniqueAppId == null) { - if (this.applicationId == null) { - throw new IllegalArgumentException("Must pass --application-id or implement #getUniqueAppId()"); - } - return this.applicationId; - } - if (this.applicationId == null) { - return uniqueAppId; - } - if (!uniqueAppId.equals(this.applicationId)) { - throw new IllegalArgumentException( - "Application ID provided via --application-id does not match #getUniqueAppId()"); - } - return uniqueAppId; - } - /** *

This method should give a default configuration to run your streaming application with.

* If {@link KafkaApplication#schemaRegistryUrl} is set {@link SpecificAvroSerde} is set as the default key, value @@ -300,6 +274,24 @@ protected Properties createKafkaProperties() { return kafkaConfig; } + private String getApplicationId() { + final String uniqueAppId = this.getUniqueAppId(); + if (uniqueAppId == null) { + if (this.uniqueAppId == null) { + throw new IllegalArgumentException("Must pass --application-id or implement #getUniqueAppId()"); + } + return this.uniqueAppId; + } + if (this.uniqueAppId == null) { + return uniqueAppId; + } + if (!uniqueAppId.equals(this.uniqueAppId)) { + throw new IllegalArgumentException( + "Application ID provided via --application-id does not match #getUniqueAppId()"); + } + return uniqueAppId; + } + /** * Run the Streams application. This method blocks until Kafka Streams has completed shutdown, either because it * caught an error or the application has received a shutdown event. diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java index 1553cc3e..8c10dd21 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java @@ -40,7 +40,7 @@ class CleanUpRunnerTest { void createTemporaryPropertiesFile() throws IOException { final WordCount wordCount = new WordCount(); wordCount.setInputTopics(List.of("input")); - final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getStreamsApplicationId(), + final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getUniqueAppId(), wordCount.getKafkaProperties()); assertThat(file).exists(); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 03be66ee..538b7663 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -172,7 +172,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group exists") - .contains(this.app.getStreamsApplicationId()); + .contains(this.app.getUniqueAppId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error retrieving consumer groups", e); } @@ -184,7 +184,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group is deleted") - .doesNotContain(this.app.getStreamsApplicationId()); + .doesNotContain(this.app.getUniqueAppId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error retrieving consumer groups", e); } @@ -211,7 +211,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group exists") - .contains(this.app.getStreamsApplicationId()); + .contains(this.app.getUniqueAppId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error retrieving consumer groups", e); } @@ -219,12 +219,12 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); try (final AdminClient adminClient = AdminClient.create(this.app.getKafkaProperties())) { - adminClient.deleteConsumerGroups(List.of(this.app.getStreamsApplicationId())).all() + adminClient.deleteConsumerGroups(List.of(this.app.getUniqueAppId())).all() .get(TIMEOUT_SECONDS, TimeUnit.SECONDS); this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group is deleted") - .doesNotContain(this.app.getStreamsApplicationId()); + .doesNotContain(this.app.getUniqueAppId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error deleting consumer group", e); } @@ -237,9 +237,9 @@ void shouldDeleteInternalTopics() throws InterruptedException { final String inputTopic = this.app.getInputTopic(); final String internalTopic = - this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"; + this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"; final String backingTopic = - this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"; + this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"; final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); @@ -410,9 +410,9 @@ void shouldDeleteSchemaOfInternalTopics() final String inputSubject = this.app.getInputTopic() + "-value"; final String internalSubject = - this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value"; + this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value"; final String backingSubject = - this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value"; + this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value"; final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value"; final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); @@ -469,7 +469,7 @@ void shouldCallCleanupHookForInternalTopics() { this.app = this.createComplexCleanUpHookApplication(); this.runCleanUp(); - final String uniqueAppId = this.app.getStreamsApplicationId(); + final String uniqueAppId = this.app.getUniqueAppId(); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"); @@ -481,7 +481,7 @@ void shouldCallCleanUpHookForAllTopics() { this.app = this.createComplexCleanUpHookApplication(); this.runCleanUpWithDeletion(); - final String uniqueAppId = this.app.getStreamsApplicationId(); + final String uniqueAppId = this.app.getUniqueAppId(); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java index 1fc24a5a..f2e7e9e5 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java @@ -49,7 +49,7 @@ void setup() { this.app = new ComplexTopologyApplication(); this.app.setInputTopics(List.of("input", "input2")); this.app.setOutputTopic("output"); - this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getStreamsApplicationId()); + this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getUniqueAppId()); } @Test @@ -131,7 +131,7 @@ void shouldNotReturnInputTopics() { void shouldReturnAllInternalTopics() { assertThat(this.topologyInformation.getInternalTopics()) .hasSize(3) - .allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getStreamsApplicationId()) + .allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getUniqueAppId()) || topic.startsWith("KSTREAM-")) .allMatch(topic -> topic.endsWith("-changelog") || topic.endsWith("-repartition")); } From 3d572de67b4eb3284d0d8eb04c74248918dc1db9 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 09:44:51 +0100 Subject: [PATCH 05/12] Add --application-id parameter --- .../src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java | 2 +- .../java/com/bakdata/kafka/integration/StreamsCleanUpTest.java | 2 +- .../java/com/bakdata/kafka/util/TopologyInformationTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java index 8c10dd21..667028a7 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2023 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 538b7663..8e21f9eb 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2023 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java index f2e7e9e5..6ea870a3 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2023 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal From 615d351d56846eea830fb852387c6718f8f380bc Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 10:07:58 +0100 Subject: [PATCH 06/12] Add --application-id parameter --- azure-pipelines.yml | 1 + charts/streams-app-cleanup-job/templates/job.yaml | 2 +- charts/streams-app/templates/deployment.yaml | 2 +- .../java/com/bakdata/kafka/KafkaStreamsApplication.java | 6 +++--- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index fc9eca7b..d97de562 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -19,6 +19,7 @@ resources: - repository: templates type: github name: bakdata/bakdata-project-templates + ref: tmp/upload-snapshot endpoint: bot jobs: diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index 60b885e9..b5fd2cc8 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -113,7 +113,7 @@ spec: value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} {{- if hasKey .Values.autoscaling "consumerGroup" }} - - name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID" + - name: "{{ .Values.configurationEnvPrefix }}_UNIQUE_APP_ID" value: {{ .Values.autoscaling.consumerGroup | quote }} {{- end }} {{- range $key, $value := .Values.secrets }} diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index 383f4b09..a20af762 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -176,7 +176,7 @@ spec: value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} {{- if hasKey .Values.autoscaling "consumerGroup" }} - - name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID" + - name: "{{ .Values.configurationEnvPrefix }}_UNIQUE_APP_ID" value: {{ .Values.autoscaling.consumerGroup | quote }} {{- end }} {{- range $key, $value := .Values.secrets }} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 3049afeb..7276db4b 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -94,7 +94,7 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement @CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1", description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.") private boolean volatileGroupInstanceId; - @CommandLine.Option(names = "--application-id", description = "Application ID to use for Kafka Streams") + @CommandLine.Option(names = "--unique-app-id", description = "Application ID to use for Kafka Streams") private String uniqueAppId; private KafkaStreams streams; private Throwable lastException; @@ -278,7 +278,7 @@ private String getApplicationId() { final String uniqueAppId = this.getUniqueAppId(); if (uniqueAppId == null) { if (this.uniqueAppId == null) { - throw new IllegalArgumentException("Must pass --application-id or implement #getUniqueAppId()"); + throw new IllegalArgumentException("Must pass --unique-app-id or implement #getUniqueAppId()"); } return this.uniqueAppId; } @@ -287,7 +287,7 @@ private String getApplicationId() { } if (!uniqueAppId.equals(this.uniqueAppId)) { throw new IllegalArgumentException( - "Application ID provided via --application-id does not match #getUniqueAppId()"); + "Application ID provided via --unique-app-id does not match #getUniqueAppId()"); } return uniqueAppId; } From 1dbec0375008ea25d0c2bda3d55cf0645aaddef8 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 10:09:18 +0100 Subject: [PATCH 07/12] Add --application-id parameter --- charts/streams-app-cleanup-job/templates/job.yaml | 2 +- charts/streams-app-cleanup-job/values.yaml | 1 + charts/streams-app/templates/deployment.yaml | 2 +- charts/streams-app/values.yaml | 1 + 4 files changed, 4 insertions(+), 2 deletions(-) diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index b5fd2cc8..4c413a3e 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -112,7 +112,7 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS" value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} - {{- if hasKey .Values.autoscaling "consumerGroup" }} + {{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }} - name: "{{ .Values.configurationEnvPrefix }}_UNIQUE_APP_ID" value: {{ .Values.autoscaling.consumerGroup | quote }} {{- end }} diff --git a/charts/streams-app-cleanup-job/values.yaml b/charts/streams-app-cleanup-job/values.yaml index b26f3ec6..164e37ab 100644 --- a/charts/streams-app-cleanup-job/values.yaml +++ b/charts/streams-app-cleanup-job/values.yaml @@ -15,6 +15,7 @@ configurationEnvPrefix: "APP" streams: # brokers: "test:9092" # schemaRegistryUrl: "url:1234" + passApplicationId: true config: {} # max.poll.records: 500 # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index a20af762..897ba33a 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -175,7 +175,7 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS" value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} - {{- if hasKey .Values.autoscaling "consumerGroup" }} + {{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }} - name: "{{ .Values.configurationEnvPrefix }}_UNIQUE_APP_ID" value: {{ .Values.autoscaling.consumerGroup | quote }} {{- end }} diff --git a/charts/streams-app/values.yaml b/charts/streams-app/values.yaml index 0309adb6..5abe8aad 100644 --- a/charts/streams-app/values.yaml +++ b/charts/streams-app/values.yaml @@ -27,6 +27,7 @@ streams: # schemaRegistryUrl: "url:1234" staticMembership: false optimizeLeaveGroupBehavior: true + passApplicationId: true config: {} # max.poll.records: 500 # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. From 32634a96c9a664961b938aa3bdb131bae4dddffb Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 10:20:01 +0100 Subject: [PATCH 08/12] Add --application-id parameter --- README.md | 3 +++ charts/streams-app/README.md | 3 ++- .../java/com/bakdata/kafka/KafkaStreamsApplication.java | 8 +++++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9c35f500..d917405d 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,9 @@ The following configuration options are available: - `--extra-output-topics`: Additional named output topics if you need to specify multiple topics with different message types (`String=String>[,...]`) +- `--unique-app-id`: Unique application ID to use for Kafka Streams. Can also be provided by implementing + #getUniqueAppId() + - `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it will change on a Streams shutdown. - `--clean-up`: Whether the state of the Kafka Streams app, i.e., offsets and state stores and auto-created topics, diff --git a/charts/streams-app/README.md b/charts/streams-app/README.md index fbbf1739..bce771b9 100644 --- a/charts/streams-app/README.md +++ b/charts/streams-app/README.md @@ -49,11 +49,12 @@ Alternatively, a YAML file that specifies the values for the parameters can be p ### Streams | Parameter | Description | Default | -| ------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | +|--------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| | `streams.brokers` | Comma separated list of Kafka brokers to connect to. | | | `streams.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` | | `streams.staticMembership` | Whether to use [Kafka Static Group Membership](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). | `false` | | `streams.optimizeLeaveGroupBehavior` | Enabling this optimizes the leave group behavior when a pod is terminated. Depends on the deployment kind, i.e., `statefulSet`. Requires the app to use streams-bootstrap 2.7+. | `true` | +| `streams.passApplicationId` | Configure the unique application ID via `autoscaling.consumerGroup`. Requires the app to use streams-bootstrap 2.16+. | `true` | | `streams.config` | Configurations for your [Kafka Streams app](https://kafka.apache.org/documentation/#streamsconfigs). | `{}` | | `streams.inputTopics` | List of input topics for your streams application. | `[]` | | `streams.extraInputTopics` | Map of additional named input topics if you need to specify multiple topics with different message types. | `{}` | diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 7276db4b..a0d40132 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -94,7 +94,13 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement @CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1", description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.") private boolean volatileGroupInstanceId; - @CommandLine.Option(names = "--unique-app-id", description = "Application ID to use for Kafka Streams") + /** + * This must be set to a unique value for every application interacting with your kafka cluster to ensure internal + * state encapsulation. Could be set to: className-inputTopic-outputTopic + */ + @CommandLine.Option(names = "--unique-app-id", + description = "Unique application ID to use for Kafka Streams. Can also be provided by implementing " + + "#getUniqueAppId()") private String uniqueAppId; private KafkaStreams streams; private Throwable lastException; From ec6d53d63cc0cf8622e7e002ab311d81fc479541 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 10:36:06 +0100 Subject: [PATCH 09/12] Add --application-id parameter --- charts/streams-app/README.md | 27 ++++++++++--------- .../streams-app/templates/scaled-object.yaml | 13 +++++++-- charts/streams-app/values.yaml | 8 ++++-- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/charts/streams-app/README.md b/charts/streams-app/README.md index bce771b9..0005585f 100644 --- a/charts/streams-app/README.md +++ b/charts/streams-app/README.md @@ -98,19 +98,20 @@ Alternatively, a YAML file that specifies the values for the parameters can be p ### Auto-Scaling -| Parameter | Description | Default | -| -------------------------------- | -------------------------------------------------------------------------------------------------------- | ---------- | -| `autoscaling.enabled` | Whether to enable auto-scaling using [KEDA](https://keda.sh/docs/latest/scalers/apache-kafka/). | `false` | -| `autoscaling.consumerGroup` | Name of the consumer group used for checking the offset on the topic and processing the related lag. | | -| `autoscaling.lagThreshold` | Average target value to trigger scaling actions. | | -| `autoscaling.pollingInterval` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#pollinginterval | `30` | -| `autoscaling.cooldownPeriod` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#cooldownperiod | `300` | -| `autoscaling.offsetResetPolicy` | The offset reset policy for the consumer if the the consumer group is not yet subscribed to a partition. | `earliest` | -| `autoscaling.minReplicas` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#minreplicacount | `0` | -| `autoscaling.maxReplicas` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#maxreplicacount | `1` | -| `autoscaling.idleReplicas` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#idlereplicacount | | -| `autoscaling.topics` | List of auto-generated Kafka Streams topics used by the streams app. | `[]` | -| `autoscaling.additionalTriggers` | List of additional KEDA triggers, see https://keda.sh/docs/latest/scalers/ | `[]` | +| Parameter | Description | Default | +|----------------------------------|--------------------------------------------------------------------------------------------------------------------|------------| +| `autoscaling.enabled` | Whether to enable auto-scaling using [KEDA](https://keda.sh/docs/latest/scalers/apache-kafka/). | `false` | +| `autoscaling.consumerGroup` | Name of the consumer group used for checking the offset on the topic and processing the related lag. | | +| `autoscaling.lagThreshold` | Average target value to trigger scaling actions. | | +| `autoscaling.pollingInterval` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#pollinginterval | `30` | +| `autoscaling.cooldownPeriod` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#cooldownperiod | `300` | +| `autoscaling.offsetResetPolicy` | The offset reset policy for the consumer if the the consumer group is not yet subscribed to a partition. | `earliest` | +| `autoscaling.minReplicas` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#minreplicacount | `0` | +| `autoscaling.maxReplicas` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#maxreplicacount | `1` | +| `autoscaling.idleReplicas` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#idlereplicacount | | +| `autoscaling.internalTopics` | List of auto-generated Kafka Streams topics used by the streams app. Consumer group prefix is added automatically. | `[]` | +| `autoscaling.topics` | List of topics used by the streams app. | `[]` | +| `autoscaling.additionalTriggers` | List of additional KEDA triggers, see https://keda.sh/docs/latest/scalers/ | `[]` | ### JVM diff --git a/charts/streams-app/templates/scaled-object.yaml b/charts/streams-app/templates/scaled-object.yaml index 6d2466d7..85b23303 100644 --- a/charts/streams-app/templates/scaled-object.yaml +++ b/charts/streams-app/templates/scaled-object.yaml @@ -26,8 +26,8 @@ spec: idleReplicaCount: {{ .Values.autoscaling.idleReplicas }} {{- end }} triggers: - {{- if not (or .Values.streams.inputTopics .Values.autoscaling.topics .Values.streams.extraInputTopics .Values.autoscaling.additionalTriggers) }} - {{- fail "To use autoscaling, you must define one of .Values.streams.inputTopics, .Values.autoscaling.topics, .Values.streams.extraInputTopics or .Values.autoscaling.additionalTriggers" }} + {{- if not (or .Values.streams.inputTopics .Values.autoscaling.internalTopics .Values.autoscaling.topics .Values.streams.extraInputTopics .Values.autoscaling.additionalTriggers) }} + {{- fail "To use autoscaling, you must define one of .Values.streams.inputTopics, .Values.autoscaling.internalTopics, .Values.autoscaling.topics, .Values.streams.extraInputTopics or .Values.autoscaling.additionalTriggers" }} {{- end}} # todo: concat .Values.streams.inputTopics and .Values.autoscaling.topics to # minimize number of loops when we don't need to support helm 2 anymore @@ -40,6 +40,15 @@ spec: lagThreshold: {{ $root.Values.autoscaling.lagThreshold | quote }} offsetResetPolicy: {{ $root.Values.autoscaling.offsetResetPolicy }} {{- end }} + {{- range .Values.autoscaling.internalTopics }} + - type: kafka + metadata: + bootstrapServers: {{ $root.Values.streams.brokers }} + consumerGroup: {{ $root.Values.autoscaling.consumerGroup }} + topic: {{ printf "%s-%s" $root.Values.autoscaling.consumerGroup . | quote }} + lagThreshold: {{ $root.Values.autoscaling.lagThreshold | quote }} + offsetResetPolicy: {{ $root.Values.autoscaling.offsetResetPolicy }} + {{- end }} {{- range .Values.autoscaling.topics }} - type: kafka metadata: diff --git a/charts/streams-app/values.yaml b/charts/streams-app/values.yaml index 5abe8aad..06776dae 100644 --- a/charts/streams-app/values.yaml +++ b/charts/streams-app/values.yaml @@ -103,8 +103,12 @@ autoscaling: minReplicas: 0 maxReplicas: 1 # idleReplicas: 0 - ## all topics from streams.inputTopics and streams.extraInputTopics are automatically taken, so - ## only use the 'topics' option for adding more topics, e.g., internal topics + ## all topics from streams.inputTopics and streams.extraInputTopics are automatically taken + ## only use the 'internalTopics' option for adding internal topics, i.e., auto-generated topics by Kafka Streams. Consumer group name will automatically be added as a prefix + internalTopics: [ ] + # - bar-repartition # results in foo-bar-repartition + # - baz-join-subscription-response-topic # results in foo-baz-join-subscription-response-topic + ## only use the 'topics' option for adding more topics, e.g., intermediate topics, not covered by `internalTopics` topics: [] # - bar # - baz From 6a4cc9625ec52c8e83a547929ad503317575b1a1 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 10:41:42 +0100 Subject: [PATCH 10/12] Add --application-id parameter --- charts/streams-app/values.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/charts/streams-app/values.yaml b/charts/streams-app/values.yaml index 06776dae..3954a377 100644 --- a/charts/streams-app/values.yaml +++ b/charts/streams-app/values.yaml @@ -105,7 +105,7 @@ autoscaling: # idleReplicas: 0 ## all topics from streams.inputTopics and streams.extraInputTopics are automatically taken ## only use the 'internalTopics' option for adding internal topics, i.e., auto-generated topics by Kafka Streams. Consumer group name will automatically be added as a prefix - internalTopics: [ ] + internalTopics: [] # - bar-repartition # results in foo-bar-repartition # - baz-join-subscription-response-topic # results in foo-baz-join-subscription-response-topic ## only use the 'topics' option for adding more topics, e.g., intermediate topics, not covered by `internalTopics` From 3d13af08bbaceb4eb8ce29440ad7fc755a1bc692 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 10:43:31 +0100 Subject: [PATCH 11/12] Add --application-id parameter --- streams-bootstrap/build.gradle.kts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/streams-bootstrap/build.gradle.kts b/streams-bootstrap/build.gradle.kts index de645e34..60394f64 100644 --- a/streams-bootstrap/build.gradle.kts +++ b/streams-bootstrap/build.gradle.kts @@ -49,3 +49,10 @@ dependencies { testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2") } + +tasks.withType { + jvmArgs( + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED" + ) +} From 31530acba6dbd3cd2518749d820411a087c15b22 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 14:15:54 +0100 Subject: [PATCH 12/12] Update --- charts/streams-app/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/charts/streams-app/README.md b/charts/streams-app/README.md index 0005585f..3b775690 100644 --- a/charts/streams-app/README.md +++ b/charts/streams-app/README.md @@ -54,7 +54,7 @@ Alternatively, a YAML file that specifies the values for the parameters can be p | `streams.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` | | `streams.staticMembership` | Whether to use [Kafka Static Group Membership](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). | `false` | | `streams.optimizeLeaveGroupBehavior` | Enabling this optimizes the leave group behavior when a pod is terminated. Depends on the deployment kind, i.e., `statefulSet`. Requires the app to use streams-bootstrap 2.7+. | `true` | -| `streams.passApplicationId` | Configure the unique application ID via `autoscaling.consumerGroup`. Requires the app to use streams-bootstrap 2.16+. | `true` | +| `streams.passApplicationId` | Configure the unique application ID via `autoscaling.consumerGroup`. Requires the app to use streams-bootstrap 2.17+. | `true` | | `streams.config` | Configurations for your [Kafka Streams app](https://kafka.apache.org/documentation/#streamsconfigs). | `{}` | | `streams.inputTopics` | List of input topics for your streams application. | `[]` | | `streams.extraInputTopics` | Map of additional named input topics if you need to specify multiple topics with different message types. | `{}` |