diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java index 82c5ba58..dee9ec44 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java @@ -259,9 +259,9 @@ public void createTopic(final String topicName, final TopicSettings settings, fi } catch (final TimeoutException ex) { throw failedToCreateTopic(topicName, ex); } - if (!this.exists(topicName)) { - throw new IllegalStateException(String.format("Creation of topic %s failed", topicName)); - } +// if (!this.exists(topicName)) { +// throw new IllegalStateException(String.format("Creation of topic %s failed", topicName)); +// } } /** diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java index 204cf39d..fbb1ebf9 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java @@ -33,6 +33,8 @@ import java.time.Duration; import java.util.Map; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; import org.junit.jupiter.api.Test; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -41,7 +43,7 @@ class TopicClientTest { @Container - private final ApacheKafkaContainerCluster kafkaCluster = new ApacheKafkaContainerCluster(KAFKA_VERSION, 3, 2); + private final ApacheKafkaContainerCluster kafkaCluster = new ApacheKafkaContainerCluster(KAFKA_VERSION, 2, 1); private static final Duration CLIENT_TIMEOUT = Duration.ofSeconds(10L); @@ -52,11 +54,16 @@ void shouldNotFindTopic() { } } + private static ConditionFactory await() { + return Awaitility.await().atMost(Duration.ofSeconds(5)); + } + @Test void shouldFindTopic() { try (final TopicClient client = this.createClient()) { client.createTopic("exists", defaultTopicSettings().build()); - assertThat(client.exists("exists")).isTrue(); + await() + .untilAsserted(() -> assertThat(client.exists("exists")).isTrue()); } } @@ -65,6 +72,11 @@ void shouldListTopics() { try (final TopicClient client = this.createClient()) { client.createTopic("foo", defaultTopicSettings().build()); client.createTopic("bar", defaultTopicSettings().build()); + await() + .untilAsserted(() -> { + assertThat(client.exists("foo")).isTrue(); + assertThat(client.exists("bar")).isTrue(); + }); assertThat(client.listTopics()) .hasSize(2) .containsExactlyInAnyOrder("foo", "bar"); @@ -75,6 +87,8 @@ void shouldListTopics() { void shouldDeleteTopic() { try (final TopicClient client = this.createClient()) { client.createTopic("foo", defaultTopicSettings().build()); + await() + .untilAsserted(() -> assertThat(client.exists("foo")).isTrue()); assertThat(client.exists("foo")).isTrue(); client.deleteTopic("foo"); assertThat(client.listTopics()) @@ -91,6 +105,8 @@ void shouldCreateTopic() { .replicationFactor((short) 2) .build(); client.createTopic("topic", settings, emptyMap()); + await() + .untilAsserted(() -> assertThat(client.exists("topic")).isTrue()); assertThat(client.exists("topic")).isTrue(); assertThat(client.describe("topic")) .satisfies(info -> { diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/ApacheKafkaContainerCluster.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/ApacheKafkaContainerCluster.java index 1ed9dcf6..1c5c4cec 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/ApacheKafkaContainerCluster.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/ApacheKafkaContainerCluster.java @@ -112,10 +112,6 @@ public void start() { return Integer.parseInt(brokers); }, readyBrokers -> readyBrokers == this.brokersNum); - await() - .pollDelay(Duration.ofSeconds(10)) - .atMost(Duration.ofSeconds(11)) - .until(() -> true); } @Override