Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --unique-app-id parameter #197

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ resources:

jobs:
- template: azure/gradle/build.yml@templates
parameters:
jdkVersion: '1.17'
- template: azure/gradle/create_tag_version.yml@templates
- template: azure/gradle/upload_release.yml@templates
- template: azure/gradle/upload_snapshot.yml@templates
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
plugins {
id("net.researchgate.release") version "3.0.2"
id("com.bakdata.sonar") version "1.1.7"
id("com.bakdata.sonatype") version "1.1.7"
id("com.bakdata.sonar") version "1.1.9"
id("com.bakdata.sonatype") version "1.1.9"
id("org.hildan.github.changelog") version "1.12.1"
id("io.freefair.lombok") version "6.6.1"
}
Expand Down
4 changes: 4 additions & 0 deletions charts/streams-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if hasKey .Values.autoscaling "consumerGroup" }}
- name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
value: {{ .Values.autoscaling.consumerGroup | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
3 changes: 3 additions & 0 deletions charts/streams-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ labels: {}

# serviceAccountName: foo

autoscaling: {}
# consumerGroup: foo

tolerations: []
# - key: "foo"
# operator: "Exists"
Expand Down
4 changes: 4 additions & 0 deletions charts/streams-app/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if hasKey .Values.autoscaling "consumerGroup" }}
- name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
value: {{ .Values.autoscaling.consumerGroup | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -94,6 +94,8 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1",
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
private boolean volatileGroupInstanceId;
@CommandLine.Option(names = "--application-id", description = "Application ID to use for Kafka Streams")
private String uniqueAppId;
private KafkaStreams streams;
private Throwable lastException;

Expand Down Expand Up @@ -155,12 +157,6 @@ public void close() {
*/
public abstract void buildTopology(StreamsBuilder builder);

/**
* This must be set to a unique value for every application interacting with your kafka cluster to ensure internal
* state encapsulation. Could be set to: className-inputTopic-outputTopic
*/
public abstract String getUniqueAppId();

/**
* Create the topology of the Kafka Streams app
*
Expand Down Expand Up @@ -271,13 +267,31 @@ protected Properties createKafkaProperties() {
kafkaConfig.setProperty(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip");

// topology
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId());
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getApplicationId());

this.configureDefaultSerde(kafkaConfig);
kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBrokers());
return kafkaConfig;
}

private String getApplicationId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it would clutter the API even more and lead to confusion together with getUniqueAppId(), but I feel like we will need to call it elsewhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this method does the validation whenever Kafka properties are created. So getUniqueAppId() should still return the correct result unless a user explicitly overrides the method and it returns null. I don't think this will ever happen

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think we need a test for this method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add tests, just wanted to have your thoughts first

final String uniqueAppId = this.getUniqueAppId();
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
if (uniqueAppId == null) {
if (this.uniqueAppId == null) {
throw new IllegalArgumentException("Must pass --application-id or implement #getUniqueAppId()");
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
}
return this.uniqueAppId;
}
if (this.uniqueAppId == null) {
return uniqueAppId;
}
if (!uniqueAppId.equals(this.uniqueAppId)) {
throw new IllegalArgumentException(
"Application ID provided via --application-id does not match #getUniqueAppId()");
}
return uniqueAppId;
}

/**
* Run the Streams application. This method blocks until Kafka Streams has completed shutdown, either because it
* caught an error or the application has received a shutdown event.
Expand Down Expand Up @@ -322,7 +336,7 @@ protected void runCleanUp() {
try (final ImprovedAdminClient adminClient = this.createAdminClient()) {
final CleanUpRunner cleanUpRunner = CleanUpRunner.builder()
.topology(this.createTopology())
.appId(this.getUniqueAppId())
.appId(this.getApplicationId())
.adminClient(adminClient)
.streams(this.streams)
.build();
Expand Down
Loading