diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index 93b06c56..6fd7179b 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -58,6 +58,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KafkaStreams.StateListener; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; @@ -96,6 +97,7 @@ static void run(final StreamsRunner runner) { static ConfiguredStreamsApp configureApp(final StreamsApp app, final StreamsTopicConfig topics) { final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder() .kafkaConfig(Map.of( + StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0", ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" )) .topics(topics) diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 533124c2..5cd59dc1 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -26,7 +26,6 @@ import static com.bakdata.kafka.integration.RunStreamsAppTest.configureApp; -import static com.bakdata.kafka.integration.RunStreamsAppTest.run; import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith; import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig; import static net.mguenther.kafka.junit.Wait.delay; @@ -35,11 +34,13 @@ import com.bakdata.kafka.CleanUpException; import com.bakdata.kafka.ConfiguredStreamsApp; +import com.bakdata.kafka.ExecutableApp; import com.bakdata.kafka.ExecutableStreamsApp; import com.bakdata.kafka.HasTopicHooks.TopicHook; import com.bakdata.kafka.KafkaEndpointConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsCleanUpConfiguration; +import com.bakdata.kafka.StreamsCleanUpRunner; import com.bakdata.kafka.StreamsRunner; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; @@ -129,17 +130,17 @@ private static ConfiguredStreamsApp createMirrorKeyApplication() { .build()); } - private static void runCleanUp(final ExecutableStreamsApp app) { + private static void reset(final ExecutableApp app) { app.createCleanUpRunner().reset(); } - private static void runCleanUpWithDeletion(final ExecutableStreamsApp app) { + private static void clean(final ExecutableApp app) { app.createCleanUpRunner().clean(); } - private static void runApp(final ExecutableStreamsApp app) throws InterruptedException { + private static void run(final ExecutableApp app) throws InterruptedException { try (final StreamsRunner runner = app.createRunner()) { - run(runner); + RunStreamsAppTest.run(runner); // Wait until stream application has consumed all data delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); } @@ -174,12 +175,12 @@ void shouldDeleteTopic() throws InterruptedException { new KeyValue<>("blub", 2L) ); - runApp(executableApp); + run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - runCleanUpWithDeletion(executableApp); + clean(executableApp); this.softly.assertThat(this.kafkaCluster.exists(app.getTopics().getOutputTopic())) .as("Output topic is deleted") @@ -204,7 +205,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { new KeyValue<>("blub", 2L) ); - runApp(executableApp); + run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); @@ -218,7 +219,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { } delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - runCleanUpWithDeletion(executableApp); + clean(executableApp); try (final AdminClient adminClient = this.createAdminClient()) { this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) @@ -248,7 +249,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept new KeyValue<>("blub", 2L) ); - runApp(executableApp); + run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); @@ -273,7 +274,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error deleting consumer group", e); } - this.softly.assertThatCode(() -> runCleanUpWithDeletion(executableApp)).doesNotThrowAnyException(); + this.softly.assertThatCode(() -> clean(executableApp)).doesNotThrowAnyException(); } } @@ -293,7 +294,7 @@ void shouldDeleteInternalTopics() throws InterruptedException { .build(); this.kafkaCluster.send(sendRequest); - runApp(executableApp); + run(executableApp); final List inputTopics = app.getTopics().getInputTopics(); final String uniqueAppId = app.getUniqueAppId(); @@ -311,13 +312,13 @@ void shouldDeleteInternalTopics() throws InterruptedException { this.softly.assertThat(this.kafkaCluster.exists(manualTopic)).isTrue(); delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - runCleanUp(executableApp); + reset(executableApp); for (final String inputTopic : inputTopics) { this.softly.assertThat(this.kafkaCluster.exists(inputTopic)).isTrue(); } - this.softly.assertThat(this.kafkaCluster.exists(internalTopic)).isTrue(); - this.softly.assertThat(this.kafkaCluster.exists(backingTopic)).isTrue(); + this.softly.assertThat(this.kafkaCluster.exists(internalTopic)).isFalse(); + this.softly.assertThat(this.kafkaCluster.exists(backingTopic)).isFalse(); this.softly.assertThat(this.kafkaCluster.exists(manualTopic)).isTrue(); } } @@ -338,7 +339,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { .build(); this.kafkaCluster.send(sendRequest); - runApp(executableApp); + run(executableApp); final List inputTopics = app.getTopics().getInputTopics(); final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; @@ -349,7 +350,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { this.softly.assertThat(this.kafkaCluster.exists(manualTopic)).isTrue(); delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - runCleanUpWithDeletion(executableApp); + clean(executableApp); for (final String inputTopic : inputTopics) { this.softly.assertThat(this.kafkaCluster.exists(inputTopic)).isTrue(); @@ -375,14 +376,14 @@ void shouldDeleteState() throws InterruptedException { new KeyValue<>("blub", 2L) ); - runApp(executableApp); + run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "All entries are once in the input topic after the 1st run"); delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - runCleanUp(executableApp); + reset(executableApp); - runApp(executableApp); + run(executableApp); final List> entriesTwice = expectedValues.stream() .flatMap(entry -> Stream.of(entry, entry)) .collect(Collectors.toList()); @@ -401,16 +402,16 @@ void shouldReprocessAlreadySeenRecords() throws InterruptedException { .useDefaults(); this.kafkaCluster.send(sendRequest); - runApp(executableApp); + run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); - runApp(executableApp); + run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); // Wait until all stream application are completely stopped before triggering cleanup delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - runCleanUp(executableApp); + reset(executableApp); - runApp(executableApp); + run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 6); } } @@ -419,8 +420,7 @@ void shouldReprocessAlreadySeenRecords() throws InterruptedException { void shouldDeleteValueSchema() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorValueApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint( - this.createEndpoint()); + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); @@ -432,14 +432,14 @@ void shouldDeleteValueSchema() .build(); this.kafkaCluster.send(sendRequest); - runApp(executableApp); + run(executableApp); // Wait until all stream application are completely stopped before triggering cleanup delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); final String outputTopic = app.getTopics().getOutputTopic(); this.softly.assertThat(client.getAllSubjects()) .contains(outputTopic + "-value", inputTopic + "-value"); - runCleanUpWithDeletion(executableApp); + clean(executableApp); this.softly.assertThat(client.getAllSubjects()) .doesNotContain(outputTopic + "-value") .contains(inputTopic + "-value"); @@ -450,8 +450,7 @@ void shouldDeleteValueSchema() void shouldDeleteKeySchema() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint( - this.createEndpoint()); + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); @@ -463,14 +462,14 @@ void shouldDeleteKeySchema() .build(); this.kafkaCluster.send(sendRequest); - runApp(executableApp); + run(executableApp); // Wait until all stream application are completely stopped before triggering cleanup delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); final String outputTopic = app.getTopics().getOutputTopic(); this.softly.assertThat(client.getAllSubjects()) .contains(outputTopic + "-key", inputTopic + "-key"); - runCleanUpWithDeletion(executableApp); + clean(executableApp); this.softly.assertThat(client.getAllSubjects()) .doesNotContain(outputTopic + "-key") .contains(inputTopic + "-key"); @@ -481,8 +480,7 @@ void shouldDeleteKeySchema() void shouldDeleteSchemaOfInternalTopics() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint( - this.createEndpoint()); + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); @@ -495,7 +493,7 @@ void shouldDeleteSchemaOfInternalTopics() .build(); this.kafkaCluster.send(sendRequest); - runApp(executableApp); + run(executableApp); // Wait until all stream application are completely stopped before triggering cleanup delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -508,7 +506,7 @@ void shouldDeleteSchemaOfInternalTopics() final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value"; this.softly.assertThat(client.getAllSubjects()) .contains(inputSubject, internalSubject, backingSubject, manualSubject); - runCleanUp(executableApp); + reset(executableApp); this.softly.assertThat(client.getAllSubjects()) .doesNotContain(internalSubject, backingSubject) @@ -521,8 +519,7 @@ void shouldDeleteSchemaOfInternalTopics() void shouldDeleteSchemaOfIntermediateTopics() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint( - this.createEndpoint()); + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); @@ -535,7 +532,7 @@ void shouldDeleteSchemaOfIntermediateTopics() .build(); this.kafkaCluster.send(sendRequest); - runApp(executableApp); + run(executableApp); // Wait until all stream application are completely stopped before triggering cleanup delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -544,7 +541,7 @@ void shouldDeleteSchemaOfIntermediateTopics() final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value"; this.softly.assertThat(client.getAllSubjects()) .contains(inputSubject, manualSubject); - runCleanUpWithDeletion(executableApp); + clean(executableApp); this.softly.assertThat(client.getAllSubjects()) .doesNotContain(manualSubject) @@ -555,9 +552,8 @@ void shouldDeleteSchemaOfIntermediateTopics() @Test void shouldCallCleanupHookForInternalTopics() { try (final ConfiguredStreamsApp app = this.createComplexCleanUpHookApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint( - this.createEndpoint())) { - runCleanUp(executableApp); + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { + reset(executableApp); final String uniqueAppId = app.getUniqueAppId(); verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"); verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog"); @@ -569,9 +565,8 @@ void shouldCallCleanupHookForInternalTopics() { @Test void shouldCallCleanUpHookForAllTopics() { try (final ConfiguredStreamsApp app = this.createComplexCleanUpHookApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint( - this.createEndpoint())) { - runCleanUp(executableApp); + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { + clean(executableApp); final String uniqueAppId = app.getUniqueAppId(); verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"); verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog"); @@ -585,12 +580,11 @@ void shouldCallCleanUpHookForAllTopics() { @Test void shouldNotThrowExceptionOnMissingInputTopic() throws InterruptedException { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint( - this.createEndpoint())) { + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { // if we don't run the app, the coordinator will be unavailable - runApp(executableApp); + run(executableApp); delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - this.softly.assertThatCode(() -> runCleanUpWithDeletion(executableApp)).doesNotThrowAnyException(); + this.softly.assertThatCode(() -> clean(executableApp)).doesNotThrowAnyException(); } } @@ -600,11 +594,12 @@ void shouldThrowExceptionOnResetterError() throws InterruptedException { final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpoint()); final StreamsRunner runner = executableApp.createRunner()) { - run(runner); + this.kafkaCluster.createTopic(TopicConfig.withName(app.getTopics().getInputTopics().get(0)).useDefaults()); + RunStreamsAppTest.run(runner); // Wait until stream application has consumed all data delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - //should throw exception because consumer group is still active - this.softly.assertThatThrownBy(() -> runCleanUpWithDeletion(executableApp)) + // should throw exception because consumer group is still active + this.softly.assertThatThrownBy(() -> reset(executableApp)) .isInstanceOf(CleanUpException.class) .hasMessageContaining("Error running streams resetter. Exit code 1"); } @@ -620,16 +615,16 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException this.kafkaCluster.send(SendValuesTransactional.inTransaction("another_topic", List.of("c")).useDefaults()); - runApp(executableApp); + run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); - runApp(executableApp); + run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); // Wait until all stream application are completely stopped before triggering cleanup delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - runCleanUp(executableApp); + reset(executableApp); - runApp(executableApp); + run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 6); } }