Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --unique-app-id parameter #197

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ The following configuration options are available:
- `--extra-output-topics`: Additional named output topics if you need to specify multiple topics with different message
types (`String=String>[,<String=String>...]`)

- `--unique-app-id`: Unique application ID to use for Kafka Streams. Can also be provided by implementing
#getUniqueAppId()

- `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.

- `--clean-up`: Whether the state of the Kafka Streams app, i.e., offsets and state stores and auto-created topics,
Expand Down
1 change: 1 addition & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ resources:
- repository: templates
type: github
name: bakdata/bakdata-project-templates
ref: tmp/upload-snapshot
endpoint: bot

jobs:
Expand Down
4 changes: 4 additions & 0 deletions charts/streams-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }}
- name: "{{ .Values.configurationEnvPrefix }}_UNIQUE_APP_ID"
value: {{ .Values.autoscaling.consumerGroup | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
4 changes: 4 additions & 0 deletions charts/streams-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ configurationEnvPrefix: "APP"
streams:
# brokers: "test:9092"
# schemaRegistryUrl: "url:1234"
passApplicationId: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default should be false if your goal is that it is non-breaking

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New helm chart should have it as default but old apps can still use new chart. We did a similar thing for optimizeLeaveGroupBehavior

config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
Expand Down Expand Up @@ -59,6 +60,9 @@ labels: {}

# serviceAccountName: foo

autoscaling: {}
# consumerGroup: foo

tolerations: []
# - key: "foo"
# operator: "Exists"
Expand Down
3 changes: 2 additions & 1 deletion charts/streams-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
### Streams

| Parameter | Description | Default |
| ------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
|--------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `streams.brokers` | Comma separated list of Kafka brokers to connect to. | |
| `streams.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` |
| `streams.staticMembership` | Whether to use [Kafka Static Group Membership](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). | `false` |
| `streams.optimizeLeaveGroupBehavior` | Enabling this optimizes the leave group behavior when a pod is terminated. Depends on the deployment kind, i.e., `statefulSet`. Requires the app to use streams-bootstrap 2.7+. | `true` |
| `streams.passApplicationId` | Configure the unique application ID via `autoscaling.consumerGroup`. Requires the app to use streams-bootstrap 2.17+. | `true` |
| `streams.config` | Configurations for your [Kafka Streams app](https://kafka.apache.org/documentation/#streamsconfigs). | `{}` |
| `streams.inputTopics` | List of input topics for your streams application. | `[]` |
| `streams.extraInputTopics` | Map of additional named input topics if you need to specify multiple topics with different message types. | `{}` |
Expand Down
4 changes: 4 additions & 0 deletions charts/streams-app/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }}
- name: "{{ .Values.configurationEnvPrefix }}_UNIQUE_APP_ID"
value: {{ .Values.autoscaling.consumerGroup | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
1 change: 1 addition & 0 deletions charts/streams-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ streams:
# schemaRegistryUrl: "url:1234"
staticMembership: false
optimizeLeaveGroupBehavior: true
passApplicationId: true
config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -94,6 +94,14 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1",
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
private boolean volatileGroupInstanceId;
/**
* This must be set to a unique value for every application interacting with your kafka cluster to ensure internal
* state encapsulation. Could be set to: className-inputTopic-outputTopic
*/
@CommandLine.Option(names = "--unique-app-id",
description = "Unique application ID to use for Kafka Streams. Can also be provided by implementing "
+ "#getUniqueAppId()")
private String uniqueAppId;
private KafkaStreams streams;
private Throwable lastException;

Expand Down Expand Up @@ -155,12 +163,6 @@ public void close() {
*/
public abstract void buildTopology(StreamsBuilder builder);

/**
* This must be set to a unique value for every application interacting with your kafka cluster to ensure internal
* state encapsulation. Could be set to: className-inputTopic-outputTopic
*/
public abstract String getUniqueAppId();

/**
* Create the topology of the Kafka Streams app
*
Expand Down Expand Up @@ -271,13 +273,31 @@ protected Properties createKafkaProperties() {
kafkaConfig.setProperty(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip");

// topology
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId());
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getApplicationId());

this.configureDefaultSerde(kafkaConfig);
kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBrokers());
return kafkaConfig;
}

private String getApplicationId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it would clutter the API even more and lead to confusion together with getUniqueAppId(), but I feel like we will need to call it elsewhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this method does the validation whenever Kafka properties are created. So getUniqueAppId() should still return the correct result unless a user explicitly overrides the method and it returns null. I don't think this will ever happen

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think we need a test for this method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add tests, just wanted to have your thoughts first

final String uniqueAppId = this.getUniqueAppId();
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
if (uniqueAppId == null) {
if (this.uniqueAppId == null) {
throw new IllegalArgumentException("Must pass --unique-app-id or implement #getUniqueAppId()");
}
return this.uniqueAppId;
}
if (this.uniqueAppId == null) {
return uniqueAppId;
}
if (!uniqueAppId.equals(this.uniqueAppId)) {
throw new IllegalArgumentException(
"Application ID provided via --unique-app-id does not match #getUniqueAppId()");
}
return uniqueAppId;
}

/**
* Run the Streams application. This method blocks until Kafka Streams has completed shutdown, either because it
* caught an error or the application has received a shutdown event.
Expand Down Expand Up @@ -322,7 +342,7 @@ protected void runCleanUp() {
try (final ImprovedAdminClient adminClient = this.createAdminClient()) {
final CleanUpRunner cleanUpRunner = CleanUpRunner.builder()
.topology(this.createTopology())
.appId(this.getUniqueAppId())
.appId(this.getApplicationId())
.adminClient(adminClient)
.streams(this.streams)
.build();
Expand Down
Loading