Skip to content

Commit

Permalink
Test topic client using multiple brokers
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 30, 2025
1 parent c946df5 commit 73a3c91
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 11 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ allprojects {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots")
maven(url = "https://jitpack.io")
}
}

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version=3.5.2-SNAPSHOT
org.gradle.caching=true
org.gradle.parallel=true
kafkaVersion=3.8.1
testContainersVersion=1.20.4
testContainersVersion=1404c44
confluentVersion=7.8.0
fluentKafkaVersion=3.0.0
junitVersion=5.11.4
Expand Down
12 changes: 10 additions & 2 deletions streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@ dependencies {

testFixturesApi(project(":streams-bootstrap-test"))
val testContainersVersion: String by project
testFixturesApi(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion)
testFixturesApi(group = "org.testcontainers", name = "kafka", version = testContainersVersion)
testFixturesApi(
group = "com.github.testcontainers.testcontainers-java",
name = "junit-jupiter",
version = testContainersVersion
)
testFixturesApi(
group = "com.github.testcontainers.testcontainers-java",
name = "kafka",
version = testContainersVersion
)
testFixturesImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion)
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
val log4jVersion: String by project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka.util;

import static com.bakdata.kafka.KafkaTest.KAFKA_VERSION;
import static com.bakdata.kafka.KafkaTestClient.defaultTopicSettings;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -40,7 +41,7 @@
class TopicClientTest {

@Container
private final ApacheKafkaContainerCluster kafkaCluster = new ApacheKafkaContainerCluster("3.8.0", 3, 2);
private final ApacheKafkaContainerCluster kafkaCluster = new ApacheKafkaContainerCluster(KAFKA_VERSION, 3, 2);

private static final Duration CLIENT_TIMEOUT = Duration.ofSeconds(10L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Getter;
import org.apache.kafka.common.Uuid;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
Expand All @@ -46,6 +47,7 @@ public class ApacheKafkaContainerCluster implements Startable {

private final Network network;

@Getter
private final Collection<KafkaContainer> brokers;

public ApacheKafkaContainerCluster(final String version, final int brokersNum, final int internalTopicsRf) {
Expand All @@ -61,8 +63,7 @@ public ApacheKafkaContainerCluster(final String version, final int brokersNum, f
this.brokersNum = brokersNum;
this.network = Network.newNetwork();

final String controllerQuorumVoters = IntStream
.range(0, brokersNum)
final String controllerQuorumVoters = IntStream.range(0, brokersNum)
.mapToObj(brokerNum -> String.format("%d@broker-%d:9094", brokerNum, brokerNum))
.collect(Collectors.joining(","));

Expand All @@ -86,10 +87,6 @@ public ApacheKafkaContainerCluster(final String version, final int brokersNum, f
.collect(Collectors.toList());
}

public Collection<KafkaContainer> getBrokers() {
return this.brokers;
}

public String getBootstrapServers() {
return this.brokers.stream().map(KafkaContainer::getBootstrapServers).collect(Collectors.joining(","));
}
Expand Down Expand Up @@ -121,5 +118,7 @@ public void start() {
@Override
public void stop() {
this.brokers.stream().parallel().forEach(GenericContainer::stop);

this.network.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@
@Testcontainers
public abstract class KafkaTest {
protected static final Duration POLL_TIMEOUT = Duration.ofSeconds(10);
public static final String KAFKA_VERSION = "3.8.1";
private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry();
@Container
private final KafkaContainer kafkaCluster = newCluster();

public static KafkaContainer newCluster() {
return new KafkaContainer(DockerImageName.parse("apache/kafka-native")
.withTag("3.8.1"));
.withTag(KAFKA_VERSION));
}

private static ConditionFactory await() {
Expand Down

0 comments on commit 73a3c91

Please sign in to comment.