diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index 7912746e..69331c02 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -24,12 +24,11 @@ package com.bakdata.kafka; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static java.util.Collections.emptyMap; +import static com.bakdata.kafka.KafkaTest.newCluster; +import static com.bakdata.kafka.KafkaTest.newTestClient; import static org.assertj.core.api.Assertions.assertThat; -import com.bakdata.kafka.util.ImprovedAdminClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.ginsberg.junit.exit.ExpectSystemExitWithStatus; import java.time.Duration; import java.util.List; @@ -37,7 +36,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Consumed; import org.junit.jupiter.api.Test; import org.testcontainers.kafka.KafkaContainer; @@ -214,7 +212,7 @@ public SerdeConfig defaultSerializationConfig() { @ExpectSystemExitWithStatus(1) void shouldExitWithErrorInTopology() throws InterruptedException { final String input = "input"; - try (final KafkaContainer kafkaCluster = newKafkaCluster(); + try (final KafkaContainer kafkaCluster = newCluster(); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -240,8 +238,8 @@ public SerdeConfig defaultSerializationConfig() { "--bootstrap-server", kafkaCluster.getBootstrapServers(), "--input-topics", input ); - new KafkaContainerHelper(kafkaCluster).send() - .to(input, List.of(new KeyValue<>("foo", "bar"))); + newTestClient(kafkaCluster).send() + .to(input, List.of(new SimpleProducerRecord<>("foo", "bar"))); Thread.sleep(Duration.ofSeconds(10).toMillis()); } } @@ -251,7 +249,7 @@ public SerdeConfig defaultSerializationConfig() { void shouldExitWithSuccessCodeOnShutdown() { final String input = "input"; final String output = "output"; - try (final KafkaContainer kafkaCluster = newKafkaCluster(); + try (final KafkaContainer kafkaCluster = newCluster(); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -270,19 +268,17 @@ public SerdeConfig defaultSerializationConfig() { } })) { kafkaCluster.start(); - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(kafkaCluster); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + final KafkaTestClient testClient = newTestClient(kafkaCluster); + testClient.createTopic(output); runApp(app, "--bootstrap-server", kafkaCluster.getBootstrapServers(), "--input-topics", input, "--output-topic", output ); - kafkaContainerHelper.send() - .to(input, List.of(new KeyValue<>("foo", "bar"))); - final List> keyValues = kafkaContainerHelper.read() + testClient.send() + .to(input, List.of(new SimpleProducerRecord<>("foo", "bar"))); + final List> keyValues = testClient.read() .from(output, Duration.ofSeconds(10)); assertThat(keyValues) .hasSize(1) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index 40bff219..1c194e19 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -24,11 +24,11 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; import static org.assertj.core.api.Assertions.assertThat; -import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaProducerApplication; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; @@ -45,29 +45,10 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.KafkaContainer; -@Testcontainers -class RunProducerAppTest { +class RunProducerAppTest extends KafkaTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); - private static final String SCHEMA_REGISTRY_URL = "mock://"; - @Container - private final KafkaContainer kafkaCluster = newKafkaCluster(); - - @BeforeEach - void setup() { - this.kafkaCluster.start(); - } - - @AfterEach - void tearDown() { - this.kafkaCluster.stop(); - } @Test void shouldRunApp() throws InterruptedException { @@ -88,15 +69,16 @@ public SerializerConfig defaultSerializationConfig() { return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class); } })) { - app.setBootstrapServers(this.kafkaCluster.getBootstrapServers()); - app.setSchemaRegistryUrl(SCHEMA_REGISTRY_URL); + app.setBootstrapServers(this.getBootstrapServers()); + final String schemaRegistryUrl = this.getSchemaRegistryUrl(); + app.setSchemaRegistryUrl(schemaRegistryUrl); app.setOutputTopic(output); app.run(); - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - assertThat(kafkaContainerHelper.read() + final KafkaTestClient testClient = this.newTestClient(); + assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl) .from(output, TIMEOUT)) .hasSize(1) .anySatisfy(kv -> { @@ -105,7 +87,7 @@ public SerializerConfig defaultSerializationConfig() { }); app.clean(); Thread.sleep(TIMEOUT.toMillis()); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin()) { assertThat(admin.getTopicClient().exists(app.getOutputTopic())) .as("Output topic is deleted") .isFalse(); diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index c1b46273..120c6b57 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -24,16 +24,14 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; -import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaStreamsApplication; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; import com.bakdata.kafka.test_applications.Mirror; -import com.bakdata.kafka.util.ImprovedAdminClient; import java.time.Duration; import java.util.List; import java.util.Map; @@ -41,31 +39,22 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KeyValue; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.KafkaContainer; -@Testcontainers @ExtendWith(MockitoExtension.class) -class RunStreamsAppTest { +class RunStreamsAppTest extends KafkaTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); - @Container - private final KafkaContainer kafkaCluster = newKafkaCluster(); @Test void shouldRunApp() { final String input = "input"; final String output = "output"; - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(output); try (final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(Mirror::new)) { - app.setBootstrapServers(this.kafkaCluster.getBootstrapServers()); + app.setBootstrapServers(this.getBootstrapServers()); app.setKafkaConfig(Map.of( ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" )); @@ -73,11 +62,11 @@ void shouldRunApp() { app.setOutputTopic(output); // run in Thread because the application blocks indefinitely new Thread(app).start(); - kafkaContainerHelper.send() + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .to(input, List.of(new KeyValue<>("foo", "bar"))); - assertThat(kafkaContainerHelper.read() + .to(input, List.of(new SimpleProducerRecord<>("foo", "bar"))); + assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .from(output, TIMEOUT)) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 5f8cd2b5..83f807d3 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -25,13 +25,11 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static java.util.Collections.emptyMap; - import com.bakdata.kafka.CloseFlagApp; -import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaStreamsApplication; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; import com.bakdata.kafka.test_applications.WordCount; import com.bakdata.kafka.util.ImprovedAdminClient; @@ -54,19 +52,13 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.KafkaContainer; -@Testcontainers @Slf4j @ExtendWith(SoftAssertionsExtension.class) @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) -class StreamsCleanUpTest { +class StreamsCleanUpTest extends KafkaTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); - @Container - private final KafkaContainer kafkaCluster = newKafkaCluster(); @InjectSoftAssertions private SoftAssertions softly; @@ -85,15 +77,13 @@ private static void runApp(final KafkaStreamsApplication app) throws Interrup @Test void shouldClean() throws InterruptedException { try (final KafkaStreamsApplication app = this.createWordCountApplication()) { - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { - admin.getTopicClient().createTopic(app.getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getOutputTopic()); + testClient.send() .to(app.getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = List.of( @@ -107,15 +97,13 @@ void shouldClean() throws InterruptedException { Thread.sleep(TIMEOUT.toMillis()); app.clean(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin()) { this.softly.assertThat(admin.getTopicClient().exists(app.getOutputTopic())) .as("Output topic is deleted") .isFalse(); } - try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { - admin.getTopicClient().createTopic(app.getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + testClient.createTopic(app.getOutputTopic()); this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 2nd run", app); } } @@ -123,15 +111,13 @@ void shouldClean() throws InterruptedException { @Test void shouldReset() throws InterruptedException { try (final KafkaStreamsApplication app = this.createWordCountApplication()) { - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { - admin.getTopicClient().createTopic(app.getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getOutputTopic()); + testClient.send() .to(app.getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = List.of( @@ -145,7 +131,7 @@ void shouldReset() throws InterruptedException { Thread.sleep(TIMEOUT.toMillis()); app.reset(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin()) { this.softly.assertThat(admin.getTopicClient().exists(app.getOutputTopic())) .as("Output topic exists") .isTrue(); @@ -161,9 +147,7 @@ void shouldReset() throws InterruptedException { @Test void shouldCallClose() throws InterruptedException { try (final CloseFlagApp app = this.createCloseFlagApplication()) { - try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { - admin.getTopicClient().createTopic(app.getInputTopics().get(0), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + this.newTestClient().createTopic(app.getInputTopics().get(0)); Thread.sleep(TIMEOUT.toMillis()); this.softly.assertThat(app.isClosed()).isFalse(); this.softly.assertThat(app.isAppClosed()).isFalse(); @@ -184,7 +168,7 @@ private CloseFlagApp createCloseFlagApplication() { } private List> readOutputTopic(final String outputTopic) { - final List> records = new KafkaContainerHelper(this.kafkaCluster).read() + final List> records = this.newTestClient().read() .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class) .from(outputTopic, TIMEOUT); return records.stream() @@ -211,7 +195,7 @@ private KafkaStreamsApplication createWordCountApplication() { } private > T configure(final T application) { - application.setBootstrapServers(this.kafkaCluster.getBootstrapServers()); + application.setBootstrapServers(this.getBootstrapServers()); application.setKafkaConfig(Map.of( StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0", ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index c85e0870..77d4a1f5 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -31,7 +31,7 @@ dependencies { testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion) testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion) - testImplementation(project(":streams-bootstrap-test")) + testFixturesApi(project(":streams-bootstrap-test")) val testContainersVersion: String by project testFixturesApi(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion) testFixturesApi(group = "org.testcontainers", name = "kafka", version = testContainersVersion) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java index 318135ac..9be1f987 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java @@ -24,6 +24,8 @@ package com.bakdata.kafka.util; +import static java.util.Collections.emptyMap; + import java.time.Duration; import java.util.Collection; import java.util.List; @@ -102,6 +104,18 @@ public void createIfNotExists(final String topicName, final TopicSettings settin } } + /** + * Creates a new Kafka topic with the specified number of partitions if it does not yet exist. + * + * @param topicName the topic name + * @param settings settings for number of partitions and replicationFactor + * @see #createTopic(String, TopicSettings, Map) + * @see #exists(String) + */ + public void createIfNotExists(final String topicName, final TopicSettings settings) { + this.createIfNotExists(topicName, settings, emptyMap()); + } + /** * Delete a Kafka topic. * @@ -225,6 +239,16 @@ public void createTopic(final String topicName, final TopicSettings settings, fi } } + /** + * Creates a new Kafka topic with the specified number of partitions. + * + * @param topicName the topic name + * @param settings settings for number of partitions and replicationFactor + */ + public void createTopic(final String topicName, final TopicSettings settings) { + this.createTopic(topicName, settings); + } + /** * List Kafka topics. * diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java index 155890b0..325479e8 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java @@ -35,6 +35,7 @@ import com.bakdata.kafka.ExecutableApp; import com.bakdata.kafka.ExecutableProducerApp; import com.bakdata.kafka.HasTopicHooks.TopicHook; +import com.bakdata.kafka.KafkaTest; import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerCleanUpConfiguration; import com.bakdata.kafka.ProducerTopicConfig; @@ -109,7 +110,7 @@ void shouldDeleteTopic() { clean(executableApp); - try (final ImprovedAdminClient admin = this.newContainerHelper().admin()) { + try (final ImprovedAdminClient admin = this.newTestClient().admin()) { this.softly.assertThat(admin.getTopicClient().exists(app.getTopics().getOutputTopic())) .as("Output topic is deleted") .isFalse(); @@ -174,7 +175,7 @@ public ProducerCleanUpConfiguration setupCleanUp( private List> readOutputTopic(final String outputTopic) { final List> records = - this.newContainerHelper().read().from(outputTopic, Duration.ofSeconds(1L)); + this.newTestClient().read().from(outputTopic, Duration.ofSeconds(1L)); return records.stream() .map(StreamsCleanUpRunnerTest::toKeyValue) .collect(Collectors.toList()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java index 7653a83c..dfd480b0 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java @@ -28,6 +28,7 @@ import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ConfiguredProducerApp; +import com.bakdata.kafka.KafkaTest; import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerRunner; import com.bakdata.kafka.ProducerTopicConfig; @@ -72,7 +73,7 @@ void shouldRunApp() { private List> readOutputTopic(final String outputTopic) { final List> records = - this.newContainerHelper().read().from(outputTopic, Duration.ofSeconds(1L)); + this.newTestClient().read().from(outputTopic, Duration.ofSeconds(1L)); return records.stream() .map(StreamsCleanUpRunnerTest::toKeyValue) .collect(Collectors.toList()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 5e8cfcea..007bedde 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -25,9 +25,7 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; import static com.bakdata.kafka.integration.StreamsRunnerTest.configureApp; -import static java.util.Collections.emptyMap; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -38,7 +36,9 @@ import com.bakdata.kafka.ExecutableApp; import com.bakdata.kafka.ExecutableStreamsApp; import com.bakdata.kafka.HasTopicHooks.TopicHook; -import com.bakdata.kafka.KafkaContainerHelper; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsCleanUpConfiguration; import com.bakdata.kafka.StreamsCleanUpRunner; @@ -52,6 +52,7 @@ import com.bakdata.kafka.test_applications.WordCountPattern; import com.bakdata.kafka.util.ConsumerGroupClient; import com.bakdata.kafka.util.ImprovedAdminClient; +import com.bakdata.kafka.util.TopicClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; @@ -145,16 +146,13 @@ void shouldDeleteTopic() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = @@ -170,8 +168,9 @@ void shouldDeleteTopic() throws InterruptedException { Thread.sleep(TIMEOUT.toMillis()); clean(executableApp); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - this.softly.assertThat(admin.getTopicClient().exists(app.getTopics().getOutputTopic())) + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { + this.softly.assertThat(topicClient.exists(app.getTopics().getOutputTopic())) .as("Output topic is deleted") .isFalse(); } @@ -183,16 +182,13 @@ void shouldDeleteConsumerGroup() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = @@ -229,16 +225,13 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = @@ -277,17 +270,14 @@ void shouldDeleteInternalTopics() throws InterruptedException { final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>("key 1", testRecord) + new SimpleProducerRecord<>("key 1", testRecord) )); run(executableApp); @@ -300,25 +290,27 @@ void shouldDeleteInternalTopics() throws InterruptedException { uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"; final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { for (final String inputTopic : inputTopics) { - this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(inputTopic)).isTrue(); } - this.softly.assertThat(admin.getTopicClient().exists(internalTopic)).isTrue(); - this.softly.assertThat(admin.getTopicClient().exists(backingTopic)).isTrue(); - this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(internalTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(backingTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } Thread.sleep(TIMEOUT.toMillis()); reset(executableApp); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { for (final String inputTopic : inputTopics) { - this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(inputTopic)).isTrue(); } - this.softly.assertThat(admin.getTopicClient().exists(internalTopic)).isFalse(); - this.softly.assertThat(admin.getTopicClient().exists(backingTopic)).isFalse(); - this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(internalTopic)).isFalse(); + this.softly.assertThat(topicClient.exists(backingTopic)).isFalse(); + this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } } } @@ -329,17 +321,14 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>("key 1", testRecord) + new SimpleProducerRecord<>("key 1", testRecord) )); run(executableApp); @@ -347,21 +336,23 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { final List inputTopics = app.getTopics().getInputTopics(); final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { for (final String inputTopic : inputTopics) { - this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(inputTopic)).isTrue(); } - this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } Thread.sleep(TIMEOUT.toMillis()); clean(executableApp); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { for (final String inputTopic : inputTopics) { - this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(inputTopic)).isTrue(); } - this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isFalse(); + this.softly.assertThat(topicClient.exists(manualTopic)).isFalse(); } } } @@ -371,16 +362,13 @@ void shouldDeleteState() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = @@ -410,16 +398,13 @@ void shouldReprocessAlreadySeenRecords() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, "a"), - new KeyValue<>(null, "b"), - new KeyValue<>(null, "c") + new SimpleProducerRecord<>(null, "a"), + new SimpleProducerRecord<>(null, "b"), + new SimpleProducerRecord<>(null, "c") )); run(executableApp); @@ -444,16 +429,13 @@ void shouldDeleteValueSchema() final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, testRecord) + new SimpleProducerRecord<>(null, testRecord) )); run(executableApp); @@ -478,16 +460,13 @@ void shouldDeleteKeySchema() final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(testRecord, "val") + new SimpleProducerRecord<>(testRecord, "val") )); run(executableApp); @@ -512,17 +491,14 @@ void shouldDeleteSchemaOfInternalTopics() final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>("key 1", testRecord) + new SimpleProducerRecord<>("key 1", testRecord) )); run(executableApp); @@ -554,17 +530,14 @@ void shouldDeleteSchemaOfIntermediateTopics() final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>("key 1", testRecord) + new SimpleProducerRecord<>("key 1", testRecord) )); run(executableApp); @@ -627,11 +600,8 @@ void shouldThrowExceptionOnResetterError() throws InterruptedException { final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpoint()); final StreamsRunner runner = executableApp.createRunner()) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getInputTopics().get(0), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getInputTopics().get(0)); StreamsRunnerTest.run(runner); // Wait until stream application has consumed all data Thread.sleep(TIMEOUT.toMillis()); @@ -647,19 +617,16 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException try (final ConfiguredStreamsApp app = createWordCountPatternApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .to("input_topic", List.of( - new KeyValue<>(null, "a"), - new KeyValue<>(null, "b") + new SimpleProducerRecord<>(null, "a"), + new SimpleProducerRecord<>(null, "b") )); - kafkaContainerHelper.send() + testClient.send() .to("another_topic", List.of( - new KeyValue<>(null, "c") + new SimpleProducerRecord<>(null, "c") )); run(executableApp); @@ -677,10 +644,7 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException } private ConfiguredStreamsApp createComplexApplication() { - try (final ImprovedAdminClient admin = this.newContainerHelper().admin()) { - admin.getTopicClient() - .createTopic(ComplexTopologyApplication.THROUGH_TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC); return configureApp(new ComplexTopologyApplication(), StreamsTopicConfig.builder() .inputTopics(List.of("input")) .outputTopic("output") @@ -688,10 +652,7 @@ private ConfiguredStreamsApp createComplexApplication() { } private ConfiguredStreamsApp createComplexCleanUpHookApplication() { - try (final ImprovedAdminClient admin = this.newContainerHelper().admin()) { - admin.getTopicClient() - .createTopic(ComplexTopologyApplication.THROUGH_TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC); return configureApp(new ComplexTopologyApplication() { @Override public StreamsCleanUpConfiguration setupCleanUp( @@ -710,7 +671,7 @@ private ImprovedAdminClient createAdminClient() { } private List> readOutputTopic(final String outputTopic) { - final List> records = this.newContainerHelper().read() + final List> records = this.newTestClient().read() .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class) .from(outputTopic, TIMEOUT); return records.stream() diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 7e9441ba..8e62b6c5 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -24,15 +24,16 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static java.util.Collections.emptyMap; +import static com.bakdata.kafka.KafkaTestClient.defaultTopicSettings; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ConfiguredStreamsApp; -import com.bakdata.kafka.KafkaContainerHelper; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsExecutionOptions; @@ -42,6 +43,7 @@ import com.bakdata.kafka.test_applications.LabeledInputTopics; import com.bakdata.kafka.test_applications.Mirror; import com.bakdata.kafka.util.ImprovedAdminClient; +import com.bakdata.kafka.util.TopicClient; import java.lang.Thread.UncaughtExceptionHandler; import java.time.Duration; import java.util.List; @@ -54,7 +56,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KafkaStreams.StateListener; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.StreamsException; @@ -128,16 +129,14 @@ void shouldRunApp() { .createRunner()) { final String inputTopic = app.getTopics().getInputTopics().get(0); final String outputTopic = app.getTopics().getOutputTopic(); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient().createTopic(outputTopic, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(outputTopic); run(runner); - kafkaContainerHelper.send() + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .to(inputTopic, List.of(new KeyValue<>("foo", "bar"))); - this.softly.assertThat(kafkaContainerHelper.read() + .to(inputTopic, List.of(new SimpleProducerRecord<>("foo", "bar"))); + this.softly.assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .from(outputTopic, TIMEOUT)) @@ -154,22 +153,24 @@ void shouldUseMultipleLabeledInputTopics() { final String inputTopic1 = inputTopics.get(0); final String inputTopic2 = inputTopics.get(1); final String outputTopic = app.getTopics().getOutputTopic(); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient().createTopic(inputTopic1, DEFAULT_TOPIC_SETTINGS, emptyMap()); - admin.getTopicClient().createTopic(inputTopic2, DEFAULT_TOPIC_SETTINGS, emptyMap()); - admin.getTopicClient().createTopic(outputTopic, DEFAULT_TOPIC_SETTINGS, emptyMap()); + final KafkaTestClient testClient = this.newTestClient(); + try (final ImprovedAdminClient admin = testClient.admin()) { + try (final TopicClient topicClient = admin.getTopicClient()) { + topicClient.createTopic(inputTopic1, defaultTopicSettings()); + topicClient.createTopic(inputTopic2, defaultTopicSettings()); + topicClient.createTopic(outputTopic, defaultTopicSettings()); + } } run(runner); - kafkaContainerHelper.send() + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .to(inputTopic1, List.of(new KeyValue<>("foo", "bar"))); - kafkaContainerHelper.send() + .to(inputTopic1, List.of(new SimpleProducerRecord<>("foo", "bar"))); + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .to(inputTopic2, List.of(new KeyValue<>("foo", "baz"))); - this.softly.assertThat(kafkaContainerHelper.read() + .to(inputTopic2, List.of(new SimpleProducerRecord<>("foo", "baz"))); + this.softly.assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .from(outputTopic, TIMEOUT)) @@ -208,17 +209,15 @@ void shouldCloseOnMapError() throws InterruptedException { .build())) { final String inputTopic = app.getTopics().getInputTopics().get(0); final String outputTopic = app.getTopics().getOutputTopic(); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient().createTopic(outputTopic, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(outputTopic); final Thread thread = run(runner); final CapturingUncaughtExceptionHandler handler = (CapturingUncaughtExceptionHandler) thread.getUncaughtExceptionHandler(); - kafkaContainerHelper.send() + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .to(inputTopic, List.of(new KeyValue<>("foo", "bar"))); + .to(inputTopic, List.of(new SimpleProducerRecord<>("foo", "bar"))); Thread.sleep(TIMEOUT.toMillis()); this.softly.assertThat(thread.isAlive()).isFalse(); this.softly.assertThat(handler.getLastException()).isInstanceOf(StreamsException.class) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index 2775ef33..d6d21a06 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -25,13 +25,12 @@ package com.bakdata.kafka.util; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static java.util.Collections.emptyMap; +import static com.bakdata.kafka.KafkaTestClient.defaultTopicSettings; -import com.bakdata.kafka.KafkaContainerHelper; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.TestRecord; -import com.bakdata.kafka.TestTopologyFactory; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; @@ -43,25 +42,17 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.streams.KeyValue; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.KafkaContainer; -@Testcontainers @Slf4j @ExtendWith(SoftAssertionsExtension.class) -class SchemaTopicClientTest { +class SchemaTopicClientTest extends KafkaTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String TOPIC = "topic"; - private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); - @Container - private final KafkaContainer kafkaCluster = newKafkaCluster(); @InjectSoftAssertions SoftAssertions softly; @@ -69,19 +60,19 @@ class SchemaTopicClientTest { @Test void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() throws InterruptedException, IOException, RestClientException { - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - final TopicClient topicClient = admin.getTopicClient(); - topicClient.createTopic(TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); + final KafkaTestClient testClient = this.newTestClient(); + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { + topicClient.createTopic(TOPIC, defaultTopicSettings()); this.softly.assertThat(topicClient.exists(TOPIC)) .as("Topic is created") .isTrue(); - kafkaContainerHelper.send() + testClient.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( - new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) + new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build()) )); final SchemaRegistryClient client = this.getSchemaRegistryClient(); @@ -103,19 +94,19 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() @Test void shouldResetSchema() throws InterruptedException, IOException, RestClientException { - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - final TopicClient topicClient = admin.getTopicClient(); - topicClient.createTopic(TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); + final KafkaTestClient testClient = this.newTestClient(); + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { + topicClient.createTopic(TOPIC, defaultTopicSettings()); this.softly.assertThat(topicClient.exists(TOPIC)) .as("Topic is created") .isTrue(); - kafkaContainerHelper.send() + testClient.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( - new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) + new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build()) )); final SchemaRegistryClient client = this.getSchemaRegistryClient(); @@ -138,19 +129,19 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc @Test void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws InterruptedException, RestClientException, IOException { - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - final TopicClient topicClient = admin.getTopicClient(); - topicClient.createTopic(TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); + final KafkaTestClient testClient = this.newTestClient(); + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { + topicClient.createTopic(TOPIC, defaultTopicSettings()); this.softly.assertThat(topicClient.exists(TOPIC)) .as("Topic is created") .isTrue(); - kafkaContainerHelper.send() + testClient.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( - new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) + new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build()) )); final SchemaRegistryClient client = this.getSchemaRegistryClient(); @@ -171,24 +162,16 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr private SchemaTopicClient createClientWithSchemaRegistry() { final Map kafkaProperties = Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBootstrapServers() + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBootstrapServers() ); return SchemaTopicClient.create(kafkaProperties, this.getSchemaRegistryUrl(), TIMEOUT); } private SchemaTopicClient createClientWithNoSchemaRegistry() { final Map kafkaProperties = Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBootstrapServers() + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBootstrapServers() ); return SchemaTopicClient.create(kafkaProperties, TIMEOUT); } - private String getSchemaRegistryUrl() { - return this.testTopologyFactory.getSchemaRegistryUrl(); - } - - private SchemaRegistryClient getSchemaRegistryClient() { - return this.testTopologyFactory.getSchemaRegistryClient(); - } - } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java index ac738b72..a30519b4 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java @@ -24,25 +24,19 @@ package com.bakdata.kafka.util; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; +import static com.bakdata.kafka.KafkaTestClient.defaultTopicSettings; import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; +import com.bakdata.kafka.KafkaTest; import java.time.Duration; import java.util.Map; import org.apache.kafka.clients.admin.AdminClientConfig; import org.junit.jupiter.api.Test; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.KafkaContainer; -@Testcontainers -class TopicClientTest { +class TopicClientTest extends KafkaTest { private static final Duration CLIENT_TIMEOUT = Duration.ofSeconds(10L); - @Container - private final KafkaContainer kafkaCluster = newKafkaCluster(); @Test void shouldNotFindTopic() { @@ -54,7 +48,7 @@ void shouldNotFindTopic() { @Test void shouldFindTopic() throws InterruptedException { try (final TopicClient client = this.createClient()) { - client.createTopic("exists", DEFAULT_TOPIC_SETTINGS, emptyMap()); + client.createTopic("exists", defaultTopicSettings()); } Thread.sleep(CLIENT_TIMEOUT.toMillis()); try (final TopicClient client = this.createClient()) { @@ -65,8 +59,8 @@ void shouldFindTopic() throws InterruptedException { @Test void shouldListTopics() throws InterruptedException { try (final TopicClient client = this.createClient()) { - client.createTopic("foo", DEFAULT_TOPIC_SETTINGS, emptyMap()); - client.createTopic("bar", DEFAULT_TOPIC_SETTINGS, emptyMap()); + client.createTopic("foo", defaultTopicSettings()); + client.createTopic("bar", defaultTopicSettings()); } Thread.sleep(CLIENT_TIMEOUT.toMillis()); try (final TopicClient client = this.createClient()) { @@ -79,7 +73,7 @@ void shouldListTopics() throws InterruptedException { @Test void shouldDeleteTopic() throws InterruptedException { try (final TopicClient client = this.createClient()) { - client.createTopic("foo", DEFAULT_TOPIC_SETTINGS, emptyMap()); + client.createTopic("foo", defaultTopicSettings()); } Thread.sleep(CLIENT_TIMEOUT.toMillis()); try (final TopicClient client = this.createClient()) { @@ -113,7 +107,7 @@ void shouldCreateTopic() throws InterruptedException { } private TopicClient createClient() { - final String brokerList = this.kafkaCluster.getBootstrapServers(); + final String brokerList = this.getBootstrapServers(); final Map config = Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); return TopicClient.create(config, CLIENT_TIMEOUT); } diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java deleted file mode 100644 index 51fa262e..00000000 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2025 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import com.bakdata.kafka.util.ImprovedAdminClient; -import com.bakdata.kafka.util.TopicSettings; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KeyValue; -import org.testcontainers.kafka.KafkaContainer; - -@RequiredArgsConstructor -public class KafkaContainerHelper { - - public static final TopicSettings DEFAULT_TOPIC_SETTINGS = TopicSettings.builder() - .partitions(1) - .replicationFactor((short) 1) - .build(); - private final @NonNull KafkaContainer kafkaCluster; - - private static List> pollAll(final Consumer consumer, final Duration timeout) { - final List> records = new ArrayList<>(); - ConsumerRecords poll = consumer.poll(timeout); - while (!poll.isEmpty()) { - poll.forEach(records::add); - poll = consumer.poll(timeout); - } - return records; - } - - public SenderBuilder send() { - return new SenderBuilder(Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class - )); - } - - public ReaderBuilder read() { - return new ReaderBuilder(Map.of( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class - )); - } - - public ImprovedAdminClient admin() { - return ImprovedAdminClient.create(Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaContainerHelper.this.kafkaCluster.getBootstrapServers() - )); - } - - @RequiredArgsConstructor - public class SenderBuilder { - - private final @NonNull Map properties; - - public SenderBuilder with(final String key, final Object value) { - final Map newProperties = new HashMap<>(this.properties); - newProperties.put(key, value); - return new SenderBuilder(Map.copyOf(newProperties)); - } - - public void to(final String topic, final Iterable> records) { - final Map producerConfig = new HashMap<>(this.properties); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - KafkaContainerHelper.this.kafkaCluster.getBootstrapServers()); - try (final Producer producer = new KafkaProducer<>(producerConfig)) { - records.forEach(kv -> producer.send(new ProducerRecord<>(topic, kv.key, kv.value))); - } - } - - } - - @RequiredArgsConstructor - public class ReaderBuilder { - - private final @NonNull Map properties; - - public ReaderBuilder with(final String key, final Object value) { - final Map newProperties = new HashMap<>(this.properties); - newProperties.put(key, value); - return new ReaderBuilder(Map.copyOf(newProperties)); - } - - public List> from(final String output, final Duration timeout) { - final Map consumerConfig = new HashMap<>(this.properties); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - KafkaContainerHelper.this.kafkaCluster.getBootstrapServers()); - try (final Consumer consumer = new KafkaConsumer<>(consumerConfig)) { - final List partitionInfos = consumer.listTopics().get(output); - final List topicPartitions = partitionInfos.stream() - .map(partition -> new TopicPartition(partition.topic(), partition.partition())) - .collect(Collectors.toList()); - consumer.assign(topicPartitions); - consumer.seekToBeginning(topicPartitions); - return pollAll(consumer, timeout); - } - } - - } -} diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java similarity index 64% rename from streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java rename to streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index f04f2001..4bdbcdfe 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -22,45 +22,54 @@ * SOFTWARE. */ -package com.bakdata.kafka.integration; +package com.bakdata.kafka; -import com.bakdata.kafka.KafkaContainerHelper; -import com.bakdata.kafka.KafkaEndpointConfig; -import com.bakdata.kafka.TestTopologyFactory; -import com.bakdata.kafka.TestUtil; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; +import org.testcontainers.utility.DockerImageName; @Testcontainers -abstract class KafkaTest { +public abstract class KafkaTest { private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); @Container - private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster(); + private final KafkaContainer kafkaCluster = newCluster(); - KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { + public static KafkaContainer newCluster() { + return new KafkaContainer(DockerImageName.parse("apache/kafka-native:3.8.1")); + } + + public static KafkaTestClient newTestClient(final KafkaContainer kafkaContainer) { + return new KafkaTestClient(kafkaContainer.getBootstrapServers()); + } + + protected KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { return KafkaEndpointConfig.builder() - .bootstrapServers(this.kafkaCluster.getBootstrapServers()) + .bootstrapServers(this.getBootstrapServers()) .build(); } - KafkaEndpointConfig createEndpoint() { + protected KafkaEndpointConfig createEndpoint() { return KafkaEndpointConfig.builder() - .bootstrapServers(this.kafkaCluster.getBootstrapServers()) + .bootstrapServers(this.getBootstrapServers()) .schemaRegistryUrl(this.getSchemaRegistryUrl()) .build(); } - KafkaContainerHelper newContainerHelper() { - return new KafkaContainerHelper(this.kafkaCluster); + protected String getBootstrapServers() { + return this.kafkaCluster.getBootstrapServers(); + } + + protected KafkaTestClient newTestClient() { + return newTestClient(this.kafkaCluster); } - String getSchemaRegistryUrl() { + protected String getSchemaRegistryUrl() { return this.testTopologyFactory.getSchemaRegistryUrl(); } - SchemaRegistryClient getSchemaRegistryClient() { + protected SchemaRegistryClient getSchemaRegistryClient() { return this.testTopologyFactory.getSchemaRegistryClient(); } } diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java deleted file mode 100644 index d9a4d7fc..00000000 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2025 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import lombok.experimental.UtilityClass; -import org.testcontainers.kafka.KafkaContainer; -import org.testcontainers.utility.DockerImageName; - -@UtilityClass -public class TestUtil { - public static KafkaContainer newKafkaCluster() { - return new KafkaContainer(DockerImageName.parse("apache/kafka-native:3.8.1")); - } -} diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java new file mode 100644 index 00000000..3ef2bc3e --- /dev/null +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java @@ -0,0 +1,82 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.bakdata.kafka.util.ImprovedAdminClient; +import com.bakdata.kafka.util.TopicClient; +import com.bakdata.kafka.util.TopicSettings; +import java.util.Map; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +@RequiredArgsConstructor +public class KafkaTestClient { + + private static final TopicSettings DEFAULT_TOPIC_SETTINGS = TopicSettings.builder() + .partitions(1) + .replicationFactor((short) 1) + .build(); + private final @NonNull String bootstrapServers; + + public static TopicSettings defaultTopicSettings() { + return DEFAULT_TOPIC_SETTINGS; + } + + public SenderBuilder send() { + return new SenderBuilder() + .with(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers) + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + } + + public ReaderBuilder read() { + return new ReaderBuilder() + .with(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers) + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + } + + public ImprovedAdminClient admin() { + return ImprovedAdminClient.create(Map.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaTestClient.this.bootstrapServers + )); + } + + public void createTopic(final String topicName, final TopicSettings settings) { + try (final ImprovedAdminClient admin = this.admin(); + final TopicClient topicClient = admin.getTopicClient()) { + topicClient.createTopic(topicName, settings); + } + } + + public void createTopic(final String topicName) { + this.createTopic(topicName, defaultTopicSettings()); + } +} diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java new file mode 100644 index 00000000..87818245 --- /dev/null +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java @@ -0,0 +1,84 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +@RequiredArgsConstructor +public class ReaderBuilder { + + private final @NonNull Map properties; + + public ReaderBuilder() { + this(new HashMap<>()); + } + + private static List> pollAll(final Consumer consumer, final Duration timeout) { + final List> records = new ArrayList<>(); + ConsumerRecords poll; + do { + poll = consumer.poll(timeout); + poll.forEach(records::add); + } while (!poll.isEmpty()); + return records; + } + + private static List> readAll(final Consumer consumer, final String topic, + final Duration timeout) { + final List partitionInfos = consumer.listTopics().get(topic); + final List topicPartitions = partitionInfos.stream() + .map(partition -> new TopicPartition(partition.topic(), partition.partition())) + .collect(Collectors.toList()); + consumer.assign(topicPartitions); + consumer.seekToBeginning(topicPartitions); + return pollAll(consumer, timeout); + } + + public ReaderBuilder with(final String key, final Object value) { + final Map newProperties = new HashMap<>(this.properties); + newProperties.put(key, value); + return new ReaderBuilder(Map.copyOf(newProperties)); + } + + public List> from(final String output, final Duration timeout) { + try (final Consumer consumer = new KafkaConsumer<>(this.properties)) { + return readAll(consumer, output, timeout); + } + } + +} diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SenderBuilder.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SenderBuilder.java new file mode 100644 index 00000000..e15256cd --- /dev/null +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SenderBuilder.java @@ -0,0 +1,84 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; + +@RequiredArgsConstructor +public class SenderBuilder { + + private final @NonNull Map properties; + + public SenderBuilder() { + this(new HashMap<>()); + } + + public SenderBuilder with(final String key, final Object value) { + final Map newProperties = new HashMap<>(this.properties); + newProperties.put(key, value); + return new SenderBuilder(Map.copyOf(newProperties)); + } + + public void to(final String topic, final Iterable> records) { + try (final Producer producer = new KafkaProducer<>(this.properties)) { + records.forEach(kv -> producer.send(kv.toProducerRecord(topic))); + } + } + + @Value + @RequiredArgsConstructor + public static class SimpleProducerRecord { + K key; + V value; + Instant timestamp; + Iterable
headers; + + public SimpleProducerRecord(final K key, final V value) { + this(key, value, (Instant) null); + } + + public SimpleProducerRecord(final K key, final V value, final Instant timestamp) { + this(key, value, timestamp, null); + } + + public SimpleProducerRecord(final K key, final V value, final Iterable
headers) { + this(key, value, null, headers); + } + + private ProducerRecord toProducerRecord(final String topic) { + return new ProducerRecord<>(topic, null, this.timestamp.toEpochMilli(), this.key, this.value, this.headers); + } + } + +}