-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add --unique-app-id parameter #197
Changes from all commits
3865e13
eafd69e
5fab21b
580fbb7
3d572de
615d351
1dbec03
32634a9
ec6d53d
6a4cc96
3d13af0
d78a1f6
31530ac
981ab00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2023 bakdata | ||
* Copyright (c) 2024 bakdata | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
|
@@ -94,6 +94,14 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement | |
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1", | ||
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.") | ||
private boolean volatileGroupInstanceId; | ||
/** | ||
* This must be set to a unique value for every application interacting with your kafka cluster to ensure internal | ||
* state encapsulation. Could be set to: className-inputTopic-outputTopic | ||
*/ | ||
@CommandLine.Option(names = "--unique-app-id", | ||
description = "Unique application ID to use for Kafka Streams. Can also be provided by implementing " | ||
+ "#getUniqueAppId()") | ||
private String uniqueAppId; | ||
private KafkaStreams streams; | ||
private Throwable lastException; | ||
|
||
|
@@ -155,12 +163,6 @@ public void close() { | |
*/ | ||
public abstract void buildTopology(StreamsBuilder builder); | ||
|
||
/** | ||
* This must be set to a unique value for every application interacting with your kafka cluster to ensure internal | ||
* state encapsulation. Could be set to: className-inputTopic-outputTopic | ||
*/ | ||
public abstract String getUniqueAppId(); | ||
|
||
/** | ||
* Create the topology of the Kafka Streams app | ||
* | ||
|
@@ -271,13 +273,31 @@ protected Properties createKafkaProperties() { | |
kafkaConfig.setProperty(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip"); | ||
|
||
// topology | ||
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId()); | ||
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getApplicationId()); | ||
|
||
this.configureDefaultSerde(kafkaConfig); | ||
kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBrokers()); | ||
return kafkaConfig; | ||
} | ||
|
||
private String getApplicationId() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess it would clutter the API even more and lead to confusion together with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, this method does the validation whenever Kafka properties are created. So getUniqueAppId() should still return the correct result unless a user explicitly overrides the method and it returns null. I don't think this will ever happen There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And I think we need a test for this method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will add tests, just wanted to have your thoughts first |
||
final String uniqueAppId = this.getUniqueAppId(); | ||
raminqaf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (uniqueAppId == null) { | ||
if (this.uniqueAppId == null) { | ||
throw new IllegalArgumentException("Must pass --unique-app-id or implement #getUniqueAppId()"); | ||
} | ||
return this.uniqueAppId; | ||
} | ||
if (this.uniqueAppId == null) { | ||
return uniqueAppId; | ||
} | ||
if (!uniqueAppId.equals(this.uniqueAppId)) { | ||
throw new IllegalArgumentException( | ||
"Application ID provided via --unique-app-id does not match #getUniqueAppId()"); | ||
} | ||
return uniqueAppId; | ||
} | ||
|
||
/** | ||
* Run the Streams application. This method blocks until Kafka Streams has completed shutdown, either because it | ||
* caught an error or the application has received a shutdown event. | ||
|
@@ -322,7 +342,7 @@ protected void runCleanUp() { | |
try (final ImprovedAdminClient adminClient = this.createAdminClient()) { | ||
final CleanUpRunner cleanUpRunner = CleanUpRunner.builder() | ||
.topology(this.createTopology()) | ||
.appId(this.getUniqueAppId()) | ||
.appId(this.getApplicationId()) | ||
.adminClient(adminClient) | ||
.streams(this.streams) | ||
.build(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the default should be false if your goal is that it is non-breaking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New helm chart should have it as default but old apps can still use new chart. We did a similar thing for optimizeLeaveGroupBehavior