Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --unique-app-id parameter #197

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ resources:

jobs:
- template: azure/gradle/build.yml@templates
parameters:
jdkVersion: '1.17'
- template: azure/gradle/create_tag_version.yml@templates
- template: azure/gradle/upload_release.yml@templates
- template: azure/gradle/upload_snapshot.yml@templates
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
plugins {
id("net.researchgate.release") version "3.0.2"
id("com.bakdata.sonar") version "1.1.7"
id("com.bakdata.sonatype") version "1.1.7"
id("com.bakdata.sonar") version "1.1.9"
id("com.bakdata.sonatype") version "1.1.9"
id("org.hildan.github.changelog") version "1.12.1"
id("io.freefair.lombok") version "6.6.1"
}
Expand Down
4 changes: 4 additions & 0 deletions charts/streams-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if hasKey .Values.autoscaling "consumerGroup" }}
- name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
value: {{ .Values.autoscaling.consumerGroup | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
3 changes: 3 additions & 0 deletions charts/streams-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ labels: {}

# serviceAccountName: foo

autoscaling: {}
# consumerGroup: foo

tolerations: []
# - key: "foo"
# operator: "Exists"
Expand Down
4 changes: 4 additions & 0 deletions charts/streams-app/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if hasKey .Values.autoscaling "consumerGroup" }}
- name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
value: {{ .Values.autoscaling.consumerGroup | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -94,6 +94,8 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1",
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
private boolean volatileGroupInstanceId;
@CommandLine.Option(names = "--application-id", description = "Application ID to use for Kafka Streams")
private String applicationId;
private KafkaStreams streams;
private Throwable lastException;

Expand Down Expand Up @@ -159,7 +161,9 @@ public void close() {
* This must be set to a unique value for every application interacting with your kafka cluster to ensure internal
* state encapsulation. Could be set to: className-inputTopic-outputTopic
*/
public abstract String getUniqueAppId();
public String getUniqueAppId() {
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

/**
* Create the topology of the Kafka Streams app
Expand Down Expand Up @@ -235,6 +239,24 @@ protected StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() {
return new DefaultStreamsUncaughtExceptionHandler();
}

public final String getStreamsApplicationId() {
final String uniqueAppId = this.getUniqueAppId();
if (uniqueAppId == null) {
if (this.applicationId == null) {
throw new IllegalArgumentException("Must pass --application-id or implement #getUniqueAppId()");
}
return this.applicationId;
}
if (this.applicationId == null) {
return uniqueAppId;
}
if (!uniqueAppId.equals(this.applicationId)) {
throw new IllegalArgumentException(
"Application ID provided via --application-id does not match #getUniqueAppId()");
}
return uniqueAppId;
}

/**
* <p>This method should give a default configuration to run your streaming application with.</p>
* If {@link KafkaApplication#schemaRegistryUrl} is set {@link SpecificAvroSerde} is set as the default key, value
Expand Down Expand Up @@ -271,7 +293,7 @@ protected Properties createKafkaProperties() {
kafkaConfig.setProperty(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip");

// topology
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId());
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getApplicationId());

this.configureDefaultSerde(kafkaConfig);
kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBrokers());
Expand Down Expand Up @@ -322,7 +344,7 @@ protected void runCleanUp() {
try (final ImprovedAdminClient adminClient = this.createAdminClient()) {
final CleanUpRunner cleanUpRunner = CleanUpRunner.builder()
.topology(this.createTopology())
.appId(this.getUniqueAppId())
.appId(this.getApplicationId())
.adminClient(adminClient)
.streams(this.streams)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -40,7 +40,7 @@ class CleanUpRunnerTest {
void createTemporaryPropertiesFile() throws IOException {
final WordCount wordCount = new WordCount();
wordCount.setInputTopics(List.of("input"));
final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getUniqueAppId(),
final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getStreamsApplicationId(),
wordCount.getKafkaProperties());

assertThat(file).exists();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -172,7 +172,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException {
this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
.extracting(ConsumerGroupListing::groupId)
.as("Consumer group exists")
.contains(this.app.getUniqueAppId());
.contains(this.app.getStreamsApplicationId());
} catch (final TimeoutException | ExecutionException e) {
throw new RuntimeException("Error retrieving consumer groups", e);
}
Expand All @@ -184,7 +184,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException {
this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
.extracting(ConsumerGroupListing::groupId)
.as("Consumer group is deleted")
.doesNotContain(this.app.getUniqueAppId());
.doesNotContain(this.app.getStreamsApplicationId());
} catch (final TimeoutException | ExecutionException e) {
throw new RuntimeException("Error retrieving consumer groups", e);
}
Expand All @@ -211,20 +211,20 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept
this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
.extracting(ConsumerGroupListing::groupId)
.as("Consumer group exists")
.contains(this.app.getUniqueAppId());
.contains(this.app.getStreamsApplicationId());
} catch (final TimeoutException | ExecutionException e) {
throw new RuntimeException("Error retrieving consumer groups", e);
}

delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);

try (final AdminClient adminClient = AdminClient.create(this.app.getKafkaProperties())) {
adminClient.deleteConsumerGroups(List.of(this.app.getUniqueAppId())).all()
adminClient.deleteConsumerGroups(List.of(this.app.getStreamsApplicationId())).all()
.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
.extracting(ConsumerGroupListing::groupId)
.as("Consumer group is deleted")
.doesNotContain(this.app.getUniqueAppId());
.doesNotContain(this.app.getStreamsApplicationId());
} catch (final TimeoutException | ExecutionException e) {
throw new RuntimeException("Error deleting consumer group", e);
}
Expand All @@ -237,9 +237,9 @@ void shouldDeleteInternalTopics() throws InterruptedException {

final String inputTopic = this.app.getInputTopic();
final String internalTopic =
this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition";
this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition";
final String backingTopic =
this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog";
this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog";
final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC;

final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
Expand Down Expand Up @@ -410,9 +410,9 @@ void shouldDeleteSchemaOfInternalTopics()

final String inputSubject = this.app.getInputTopic() + "-value";
final String internalSubject =
this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value";
this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value";
final String backingSubject =
this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value";
this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value";
final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value";

final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient();
Expand Down Expand Up @@ -469,7 +469,7 @@ void shouldCallCleanupHookForInternalTopics() {
this.app = this.createComplexCleanUpHookApplication();

this.runCleanUp();
final String uniqueAppId = this.app.getUniqueAppId();
final String uniqueAppId = this.app.getStreamsApplicationId();
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition");
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog");
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog");
Expand All @@ -481,7 +481,7 @@ void shouldCallCleanUpHookForAllTopics() {
this.app = this.createComplexCleanUpHookApplication();

this.runCleanUpWithDeletion();
final String uniqueAppId = this.app.getUniqueAppId();
final String uniqueAppId = this.app.getStreamsApplicationId();
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition");
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog");
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -49,7 +49,7 @@ void setup() {
this.app = new ComplexTopologyApplication();
this.app.setInputTopics(List.of("input", "input2"));
this.app.setOutputTopic("output");
this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getUniqueAppId());
this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getStreamsApplicationId());
}

@Test
Expand Down Expand Up @@ -131,7 +131,7 @@ void shouldNotReturnInputTopics() {
void shouldReturnAllInternalTopics() {
assertThat(this.topologyInformation.getInternalTopics())
.hasSize(3)
.allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getUniqueAppId())
.allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getStreamsApplicationId())
|| topic.startsWith("KSTREAM-"))
.allMatch(topic -> topic.endsWith("-changelog") || topic.endsWith("-repartition"));
}
Expand Down
Loading