Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 5, 2024
1 parent bee1b33 commit 3cb59d8
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,17 @@ public void clean() {
}

/**
* Stop all applications that have been started by {@link #run()}.
* @see #stop()
*/
@Override
public void close() {
this.stop();
}

/**
* Stop all applications that have been started by {@link #run()}.
*/
public void stop() {
this.runningApps.forEach(RunningApp::close);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) {

@Test
void shouldParseArguments() {
final KafkaStreamsApplication app = new KafkaStreamsApplication() {
try (final KafkaStreamsApplication app = new KafkaStreamsApplication() {
@Override
public StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
Expand All @@ -283,39 +283,40 @@ public String getUniqueAppId(final StreamsTopicConfig topics) {
public void run() {
// do nothing
}
};
KafkaApplication.startApplicationWithoutExit(app, new String[]{
"--brokers", "brokers",
"--schema-registry-url", "schema-registry",
"--input-topics", "input1,input2",
"--extra-input-topics", "role1=input3,role2=input4;input5",
"--input-pattern", ".*",
"--extra-input-patterns", "role1=.+,role2=\\d+",
"--output-topic", "output1",
"--extra-output-topics", "role1=output2,role2=output3",
"--kafka-config", "foo=1,bar=2",
});
assertThat(app.getInputTopics()).containsExactly("input1", "input2");
assertThat(app.getExtraInputTopics())
.hasSize(2)
.containsEntry("role1", List.of("input3"))
.containsEntry("role2", List.of("input4", "input5"));
assertThat(app.getInputPattern())
.satisfies(pattern -> assertThat(pattern.pattern()).isEqualTo(Pattern.compile(".*").pattern()));
assertThat(app.getExtraInputPatterns())
.hasSize(2)
.hasEntrySatisfying("role1",
pattern -> assertThat(pattern.pattern()).isEqualTo(Pattern.compile(".+").pattern()))
.hasEntrySatisfying("role2",
pattern -> assertThat(pattern.pattern()).isEqualTo(Pattern.compile("\\d+").pattern()));
assertThat(app.getOutputTopic()).isEqualTo("output1");
assertThat(app.getExtraOutputTopics())
.hasSize(2)
.containsEntry("role1", "output2")
.containsEntry("role2", "output3");
assertThat(app.getKafkaConfig())
.hasSize(2)
.containsEntry("foo", "1")
.containsEntry("bar", "2");
}) {
KafkaApplication.startApplicationWithoutExit(app, new String[]{
"--brokers", "brokers",
"--schema-registry-url", "schema-registry",
"--input-topics", "input1,input2",
"--extra-input-topics", "role1=input3,role2=input4;input5",
"--input-pattern", ".*",
"--extra-input-patterns", "role1=.+,role2=\\d+",
"--output-topic", "output1",
"--extra-output-topics", "role1=output2,role2=output3",
"--kafka-config", "foo=1,bar=2",
});
assertThat(app.getInputTopics()).containsExactly("input1", "input2");
assertThat(app.getExtraInputTopics())
.hasSize(2)
.containsEntry("role1", List.of("input3"))
.containsEntry("role2", List.of("input4", "input5"));
assertThat(app.getInputPattern())
.satisfies(pattern -> assertThat(pattern.pattern()).isEqualTo(Pattern.compile(".*").pattern()));
assertThat(app.getExtraInputPatterns())
.hasSize(2)
.hasEntrySatisfying("role1",
pattern -> assertThat(pattern.pattern()).isEqualTo(Pattern.compile(".+").pattern()))
.hasEntrySatisfying("role2",
pattern -> assertThat(pattern.pattern()).isEqualTo(Pattern.compile("\\d+").pattern()));
assertThat(app.getOutputTopic()).isEqualTo("output1");
assertThat(app.getExtraOutputTopics())
.hasSize(2)
.containsEntry("role1", "output2")
.containsEntry("role2", "output3");
assertThat(app.getKafkaConfig())
.hasSize(2)
.containsEntry("foo", "1")
.containsEntry("bar", "2");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.kafka.streams.kstream.KStream;

@NoArgsConstructor
@Getter
@Setter
public class CloseFlagApp extends KafkaStreamsApplication {

private boolean closed = false;
private boolean appClosed = false;

@Override
public void close() {
super.close();
this.closed = true;
}

@Override
protected StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> input = builder.streamInput();
input.to(builder.getTopics().getOutputTopic());
}

@Override
public String getUniqueAppId(final StreamsTopicConfig topics) {
return this.getClass().getSimpleName() + "-" + topics.getOutputTopic();
}

@Override
public void close() {
CloseFlagApp.this.appClosed = true;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
import static net.mguenther.kafka.junit.Wait.delay;

import com.bakdata.kafka.CloseFlagApp;
import com.bakdata.kafka.KafkaStreamsApplication;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.test_applications.WordCount;
Expand All @@ -42,6 +43,7 @@
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.SendValuesTransactional;
import net.mguenther.kafka.junit.TopicConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.streams.StreamsConfig;
Expand All @@ -68,7 +70,7 @@ class StreamsCleanUpTest {

private static void runAppAndClose(final KafkaStreamsApplication app) throws InterruptedException {
runApp(app);
app.close();
app.stop();
}

private static void runApp(final KafkaStreamsApplication app) throws InterruptedException {
Expand Down Expand Up @@ -144,6 +146,34 @@ void shouldReset() throws InterruptedException {
}
}

@Test
void shouldCallClose() throws InterruptedException {
try (final CloseFlagApp app = this.createCloseFlagApplication()) {
this.kafkaCluster.createTopic(TopicConfig.withName(app.getInputTopics().get(0)).useDefaults());
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
this.softly.assertThat(app.isClosed()).isFalse();
this.softly.assertThat(app.isAppClosed()).isFalse();
// if we don't run the app, the coordinator will be unavailable
runAppAndClose(app);
this.softly.assertThat(app.isAppClosed()).isTrue();
app.setAppClosed(false);
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
app.clean();
this.softly.assertThat(app.isAppClosed()).isTrue();
app.setAppClosed(false);
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
app.reset();
this.softly.assertThat(app.isAppClosed()).isTrue();
}
}

private CloseFlagApp createCloseFlagApplication() {
final CloseFlagApp app = new CloseFlagApp();
app.setInputTopics(List.of("input"));
app.setOutputTopic("output");
return this.configure(app);
}

private List<KeyValue<String, Long>> readOutputTopic(final String outputTopic) throws InterruptedException {
final ReadKeyValues<String, Long> readRequest = ReadKeyValues.from(outputTopic, Long.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class).build();
Expand All @@ -164,12 +194,16 @@ private void runAndAssertContent(final Iterable<? extends KeyValue<String, Long>
private KafkaStreamsApplication createWordCountApplication() {
final KafkaStreamsApplication application = new SimpleKafkaStreamsApplication(WordCount::new);
application.setOutputTopic("word_output");
application.setInputTopics(List.of("word_input"));
return this.configure(application);
}

private <T extends KafkaStreamsApplication> T configure(final T application) {
application.setBrokers(this.kafkaCluster.getBrokerList());
application.setKafkaConfig(Map.of(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0",
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
application.setInputTopics(List.of("word_input"));
return application;
}

Expand Down

0 comments on commit 3cb59d8

Please sign in to comment.