Skip to content

Commit

Permalink
Test topic client using multiple brokers
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 30, 2025
1 parent 6886227 commit d40ffef
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,9 @@ public void createTopic(final String topicName, final TopicSettings settings, fi
} catch (final TimeoutException ex) {
throw failedToCreateTopic(topicName, ex);
}
if (!this.exists(topicName)) {
throw new IllegalStateException(String.format("Creation of topic %s failed", topicName));
}
// if (!this.exists(topicName)) {
// throw new IllegalStateException(String.format("Creation of topic %s failed", topicName));
// }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
Expand All @@ -41,7 +43,7 @@
class TopicClientTest {

@Container
private final ApacheKafkaContainerCluster kafkaCluster = new ApacheKafkaContainerCluster(KAFKA_VERSION, 3, 2);
private final ApacheKafkaContainerCluster kafkaCluster = new ApacheKafkaContainerCluster(KAFKA_VERSION, 2, 1);

private static final Duration CLIENT_TIMEOUT = Duration.ofSeconds(10L);

Expand All @@ -52,11 +54,16 @@ void shouldNotFindTopic() {
}
}

private static ConditionFactory await() {
return Awaitility.await().atMost(Duration.ofSeconds(5));
}

@Test
void shouldFindTopic() {
try (final TopicClient client = this.createClient()) {
client.createTopic("exists", defaultTopicSettings().build());
assertThat(client.exists("exists")).isTrue();
await()
.untilAsserted(() -> assertThat(client.exists("exists")).isTrue());
}
}

Expand All @@ -65,6 +72,11 @@ void shouldListTopics() {
try (final TopicClient client = this.createClient()) {
client.createTopic("foo", defaultTopicSettings().build());
client.createTopic("bar", defaultTopicSettings().build());
await()
.untilAsserted(() -> {
assertThat(client.exists("foo")).isTrue();
assertThat(client.exists("bar")).isTrue();
});
assertThat(client.listTopics())
.hasSize(2)
.containsExactlyInAnyOrder("foo", "bar");
Expand All @@ -75,6 +87,8 @@ void shouldListTopics() {
void shouldDeleteTopic() {
try (final TopicClient client = this.createClient()) {
client.createTopic("foo", defaultTopicSettings().build());
await()
.untilAsserted(() -> assertThat(client.exists("foo")).isTrue());
assertThat(client.exists("foo")).isTrue();
client.deleteTopic("foo");
assertThat(client.listTopics())
Expand All @@ -91,6 +105,8 @@ void shouldCreateTopic() {
.replicationFactor((short) 2)
.build();
client.createTopic("topic", settings, emptyMap());
await()
.untilAsserted(() -> assertThat(client.exists("topic")).isTrue());
assertThat(client.exists("topic")).isTrue();
assertThat(client.describe("topic"))
.satisfies(info -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ public void start() {

return Integer.parseInt(brokers);
}, readyBrokers -> readyBrokers == this.brokersNum);
await()
.pollDelay(Duration.ofSeconds(10))
.atMost(Duration.ofSeconds(11))
.until(() -> true);
}

@Override
Expand Down

0 comments on commit d40ffef

Please sign in to comment.