diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java index a25006f7..82c5ba58 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java @@ -46,7 +46,6 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.jooq.lambda.Seq; /** @@ -180,12 +179,9 @@ public void close() { * @return whether a Kafka topic with the specified name exists or not */ public boolean exists(final String topicName) { - try { - this.getDescription(topicName); - return true; - } catch (final UnknownTopicOrPartitionException e) { - return false; - } + final Collection topics = this.listTopics(); + return topics.stream() + .anyMatch(t -> t.equals(topicName)); } /** diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index b0ca274d..92f91c57 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -45,12 +45,16 @@ public static KafkaContainer newCluster() { .withTag("3.8.1")); } - public static ConditionFactory awaitAtMost(final Duration timeout) { + private static ConditionFactory awaitAtMost(final Duration timeout) { return await() .pollInterval(Duration.ofSeconds(2L)) .atMost(timeout); } + private static String getUniqueAppId(final ExecutableStreamsApp app) { + return new ImprovedStreamsConfig(app.getConfig()).getAppId(); + } + protected KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { return KafkaEndpointConfig.builder() .bootstrapServers(this.getBootstrapServers()) @@ -80,10 +84,6 @@ protected SchemaRegistryClient getSchemaRegistryClient() { return this.testTopologyFactory.getSchemaRegistryClient(); } - private static String getUniqueAppId(final ExecutableStreamsApp app) { - return new ImprovedStreamsConfig(app.getConfig()).getAppId(); - } - protected void awaitProcessing(final ExecutableStreamsApp app, final Duration timeout) { this.awaitActive(app, timeout); awaitAtMost(timeout)