Skip to content

Commit

Permalink
Add CLI parameter to specify streams application ID (#243)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Jul 29, 2024
1 parent ecf41c8 commit 878e22b
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 11 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ The following configuration options are available:
- `--labeled-output-topics`: Additional labeled output topics if you need to specify multiple topics with different
message types (`String=String>[,<String=String>...]`)

- `--application-id`: Unique application ID to use for Kafka Streams. Can also be provided by
implementing `StreamsApp#getUniqueAppId()`

- `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.

Additionally, the following commands are available:
Expand Down
4 changes: 4 additions & 0 deletions charts/streams-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_LABELED_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.kafka.labeledInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if hasKey .Values.kafka "applicationId" }}
- name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
value: {{ .Values.kafka.applicationId | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
2 changes: 1 addition & 1 deletion charts/streams-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
| `kafka.outputTopic` | Output topic for your streams application. | |
| `kafka.labeledOutputTopics` | Map of additional labeled output topics if you need to specify multiple topics with different message types. | `{}` |
| `kafka.errorTopic` | Error topic for your streams application. | |
| `kafka.applicationId` | Unique application ID for Kafka Streams. Required for auto-scaling | |

### Other

Expand Down Expand Up @@ -99,7 +100,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
| Parameter | Description | Default |
| -------------------------------- | ------------------------------------------------------------------------------------------------------------------ | ---------- |
| `autoscaling.enabled` | Whether to enable auto-scaling using [KEDA](https://keda.sh/docs/latest/scalers/apache-kafka/). | `false` |
| `autoscaling.consumerGroup` | Name of the consumer group used for checking the offset on the topic and processing the related lag. | |
| `autoscaling.lagThreshold` | Average target value to trigger scaling actions. | |
| `autoscaling.pollingInterval` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#pollinginterval | `30` |
| `autoscaling.cooldownPeriod` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#cooldownperiod | `300` |
Expand Down
10 changes: 7 additions & 3 deletions charts/streams-app/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ kind: Deployment
{{- end }}
metadata:
name: {{ template "streams-app.fullname" . }}
{{- if or .Values.autoscaling.consumerGroup .Values.annotations }}
{{- if or .Values.kafka.applicationId .Values.annotations }}
annotations:
{{- range $key, $value := .Values.annotations }}
{{ $key | quote }}: {{ $value | quote }}
{{- end }}
{{- if and .Values.autoscaling.consumerGroup (not .Values.annotations.consumerGroup) }}
consumerGroup: {{ .Values.autoscaling.consumerGroup | quote }}
{{- if and .Values.kafka.applicationId (not .Values.annotations.consumerGroup) }}
consumerGroup: {{ .Values.kafka.applicationId | quote }}
{{- end }}
{{- end }}
labels:
Expand Down Expand Up @@ -152,6 +152,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_LABELED_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.kafka.labeledInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if hasKey .Values.kafka "applicationId" }}
- name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
value: {{ .Values.kafka.applicationId | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
10 changes: 5 additions & 5 deletions charts/streams-app/templates/scaled-object.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ spec:
- type: kafka
metadata:
bootstrapServers: {{ $root.Values.kafka.bootstrapServers }}
consumerGroup: {{ $root.Values.autoscaling.consumerGroup }}
consumerGroup: {{ $root.Values.kafka.applicationId }}
topic: {{ . | quote }}
lagThreshold: {{ $root.Values.autoscaling.lagThreshold | quote }}
offsetResetPolicy: {{ $root.Values.autoscaling.offsetResetPolicy }}
Expand All @@ -44,16 +44,16 @@ spec:
- type: kafka
metadata:
bootstrapServers: {{ $root.Values.kafka.bootstrapServers }}
consumerGroup: {{ $root.Values.autoscaling.consumerGroup }}
topic: {{ printf "%s-%s" $root.Values.autoscaling.consumerGroup . | quote }}
consumerGroup: {{ $root.Values.kafka.applicationId }}
topic: {{ printf "%s-%s" $root.Values.kafka.applicationId . | quote }}
lagThreshold: {{ $root.Values.autoscaling.lagThreshold | quote }}
offsetResetPolicy: {{ $root.Values.autoscaling.offsetResetPolicy }}
{{- end }}
{{- range .Values.autoscaling.topics }}
- type: kafka
metadata:
bootstrapServers: {{ $root.Values.kafka.bootstrapServers }}
consumerGroup: {{ $root.Values.autoscaling.consumerGroup }}
consumerGroup: {{ $root.Values.kafka.applicationId }}
topic: {{ . | quote }}
lagThreshold: {{ $root.Values.autoscaling.lagThreshold | quote }}
offsetResetPolicy: {{ $root.Values.autoscaling.offsetResetPolicy }}
Expand All @@ -63,7 +63,7 @@ spec:
- type: kafka
metadata:
bootstrapServers: {{ $root.Values.kafka.bootstrapServers }}
consumerGroup: {{ $root.Values.autoscaling.consumerGroup }}
consumerGroup: {{ $root.Values.kafka.applicationId }}
topic: {{ $topic | quote }}
lagThreshold: {{ $root.Values.autoscaling.lagThreshold | quote }}
offsetResetPolicy: {{ $root.Values.autoscaling.offsetResetPolicy }}
Expand Down
2 changes: 1 addition & 1 deletion charts/streams-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ kafka:
labeledOutputTopics: {}
# label: output
# errorTopic: error
# applicationId: foo

commandLine: {}
# MY_CLI_PARAM: "foo-bar"
Expand Down Expand Up @@ -97,7 +98,6 @@ jmx:

autoscaling:
enabled: false
# consumerGroup: foo
# lagThreshold: "1000"
pollingInterval: 30
cooldownPeriod: 300
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public abstract class KafkaStreamsApplication extends
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1",
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
private boolean volatileGroupInstanceId;
@CommandLine.Option(names = "--application-id",
description = "Unique application ID to use for Kafka Streams. Can also be provided by implementing "
+ "StreamsApp#getUniqueAppId()")
private String applicationId;

/**
* Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate
Expand Down Expand Up @@ -135,7 +139,12 @@ public final StreamsTopicConfig createTopicConfig() {
@Override
public final ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final StreamsApp app,
final AppConfiguration<StreamsTopicConfig> configuration) {
return new ConfiguredStreamsApp<>(app, configuration);
final ConfiguredStreamsApp<StreamsApp> configuredApp = new ConfiguredStreamsApp<>(app, configuration);
if (this.applicationId != null && !configuredApp.getUniqueAppId().equals(this.applicationId)) {
throw new IllegalArgumentException(
"Application ID provided via --application-id does not match StreamsApp#getUniqueAppId()");
}
return configuredApp;
}

/**
Expand Down
32 changes: 32 additions & 0 deletions streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,38 @@ public void run() {
});
}

@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorCodeOnInconsistentAppId() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
builder.streamInput().to(builder.getTopics().getOutputTopic());
}

@Override
public String getUniqueAppId(final StreamsTopicConfig topics) {
return "my-id";
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}
};
}
}, new String[]{
"--bootstrap-servers", "localhost:9092",
"--schema-registry-url", "http://localhost:8081",
"--input-topics", "input",
"--output-topic", "output",
"--application-id", "my-other-id"
});
}

@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorInTopology() throws InterruptedException {
Expand Down

0 comments on commit 878e22b

Please sign in to comment.