Skip to content

Commit

Permalink
Debug
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 6, 2025
1 parent e82001a commit ce83a54
Showing 1 changed file with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -136,6 +137,11 @@ private static void run(final ExecutableApp<StreamsRunner, ?, ?> app) throws Int
}
}

@BeforeEach
void setup() throws InterruptedException {
Thread.sleep(TIMEOUT.toMillis());
}

@Test
void shouldDeleteTopic() throws InterruptedException {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountApplication();
Expand Down Expand Up @@ -201,17 +207,19 @@ void shouldDeleteConsumerGroup() throws InterruptedException {
this.assertContent(app.getTopics().getOutputTopic(), expectedValues,
"WordCount contains all elements after first run");

try (final ConsumerGroupClient adminClient = this.createAdminClient().getConsumerGroupClient()) {
this.softly.assertThat(adminClient.exists(app.getUniqueAppId()))
try (final ImprovedAdminClient adminClient = this.createAdminClient();
final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) {
this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId()))
.as("Consumer group exists")
.isTrue();
}

Thread.sleep(TIMEOUT.toMillis());
clean(executableApp);

try (final ConsumerGroupClient adminClient = this.createAdminClient().getConsumerGroupClient()) {
this.softly.assertThat(adminClient.exists(app.getUniqueAppId()))
try (final ImprovedAdminClient adminClient = this.createAdminClient();
final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) {
this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId()))
.as("Consumer group is deleted")
.isFalse();
}
Expand Down Expand Up @@ -245,17 +253,19 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept
this.assertContent(app.getTopics().getOutputTopic(), expectedValues,
"WordCount contains all elements after first run");

try (final ConsumerGroupClient adminClient = this.createAdminClient().getConsumerGroupClient()) {
this.softly.assertThat(adminClient.exists(app.getUniqueAppId()))
try (final ImprovedAdminClient adminClient = this.createAdminClient();
final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) {
this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId()))
.as("Consumer group exists")
.isTrue();
}

Thread.sleep(TIMEOUT.toMillis());

try (final ConsumerGroupClient adminClient = this.createAdminClient().getConsumerGroupClient()) {
adminClient.deleteConsumerGroup(app.getUniqueAppId());
this.softly.assertThat(adminClient.exists(app.getUniqueAppId()))
try (final ImprovedAdminClient adminClient = this.createAdminClient();
final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) {
consumerGroupClient.deleteConsumerGroup(app.getUniqueAppId());
this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId()))
.as("Consumer group is deleted")
.isFalse();
}
Expand Down Expand Up @@ -591,6 +601,7 @@ void shouldCallCleanupHookForInternalTopics() {
verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition");
verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog");
verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog");
verify(this.topicHook).close();
verifyNoMoreInteractions(this.topicHook);
}
}
Expand All @@ -606,6 +617,7 @@ void shouldCallCleanUpHookForAllTopics() {
verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog");
verify(this.topicHook).deleted(ComplexTopologyApplication.THROUGH_TOPIC);
verify(this.topicHook).deleted(app.getTopics().getOutputTopic());
verify(this.topicHook).close();
verifyNoMoreInteractions(this.topicHook);
}
}
Expand Down

0 comments on commit ce83a54

Please sign in to comment.