Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 17, 2025
1 parent 857a910 commit 83fc24c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.jooq.lambda.Seq;

/**
Expand Down Expand Up @@ -180,12 +179,9 @@ public void close() {
* @return whether a Kafka topic with the specified name exists or not
*/
public boolean exists(final String topicName) {
try {
this.getDescription(topicName);
return true;
} catch (final UnknownTopicOrPartitionException e) {
return false;
}
final Collection<String> topics = this.listTopics();
return topics.stream()
.anyMatch(t -> t.equals(topicName));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ public static KafkaContainer newCluster() {
.withTag("3.8.1"));
}

public static ConditionFactory awaitAtMost(final Duration timeout) {
private static ConditionFactory awaitAtMost(final Duration timeout) {
return await()
.pollInterval(Duration.ofSeconds(2L))
.atMost(timeout);
}

private static String getUniqueAppId(final ExecutableStreamsApp<?> app) {
return new ImprovedStreamsConfig(app.getConfig()).getAppId();
}

protected KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
return KafkaEndpointConfig.builder()
.bootstrapServers(this.getBootstrapServers())
Expand Down Expand Up @@ -80,10 +84,6 @@ protected SchemaRegistryClient getSchemaRegistryClient() {
return this.testTopologyFactory.getSchemaRegistryClient();
}

private static String getUniqueAppId(final ExecutableStreamsApp<?> app) {
return new ImprovedStreamsConfig(app.getConfig()).getAppId();
}

protected void awaitProcessing(final ExecutableStreamsApp<?> app, final Duration timeout) {
this.awaitActive(app, timeout);
awaitAtMost(timeout)
Expand Down

0 comments on commit 83fc24c

Please sign in to comment.