From 4c739232c1ded2497f9caafbded9856bceb0c2e9 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 8 Jan 2025 21:41:23 +0100 Subject: [PATCH 1/7] Use Confluent MockSchemaRegistry --- gradle.properties | 2 +- streams-bootstrap-cli/build.gradle.kts | 8 ++---- .../kafka/integration/RunProducerAppTest.java | 10 ++----- streams-bootstrap-core/build.gradle.kts | 6 ---- .../bakdata/kafka/integration/KafkaTest.java | 16 +++++++---- .../ProducerCleanUpRunnerTest.java | 4 +-- .../integration/StreamsCleanUpRunnerTest.java | 26 +++++++---------- .../kafka/util/SchemaTopicClientTest.java | 28 +++++++++---------- 8 files changed, 42 insertions(+), 58 deletions(-) diff --git a/gradle.properties b/gradle.properties index bc437f05..4f340d48 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,7 +5,7 @@ org.gradle.parallel=false kafkaVersion=3.8.1 testContainersVersion=1.20.4 confluentVersion=7.8.0 -fluentKafkaVersion=2.16.0 +fluentKafkaVersion=2.16.1-SNAPSHOT junitVersion=5.11.4 mockitoVersion=5.15.2 assertJVersion=3.27.2 diff --git a/streams-bootstrap-cli/build.gradle.kts b/streams-bootstrap-cli/build.gradle.kts index 09391646..3da100df 100644 --- a/streams-bootstrap-cli/build.gradle.kts +++ b/streams-bootstrap-cli/build.gradle.kts @@ -19,12 +19,8 @@ dependencies { testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion) testImplementation(testFixtures(project(":streams-bootstrap-core"))) testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2") - val fluentKafkaVersion: String by project - testImplementation( - group = "com.bakdata.fluent-kafka-streams-tests", - name = "schema-registry-mock-junit5", - version = fluentKafkaVersion - ) val log4jVersion: String by project testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) + val confluentVersion: String by project + testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index d9d2ae56..40bff219 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -36,7 +36,6 @@ import com.bakdata.kafka.SimpleKafkaProducerApplication; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.util.ImprovedAdminClient; -import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; @@ -49,7 +48,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; @@ -57,8 +55,7 @@ @Testcontainers class RunProducerAppTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); - @RegisterExtension - final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); + private static final String SCHEMA_REGISTRY_URL = "mock://"; @Container private final KafkaContainer kafkaCluster = newKafkaCluster(); @@ -92,15 +89,14 @@ public SerializerConfig defaultSerializationConfig() { } })) { app.setBootstrapServers(this.kafkaCluster.getBootstrapServers()); - app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()); + app.setSchemaRegistryUrl(SCHEMA_REGISTRY_URL); app.setOutputTopic(output); app.run(); final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); assertThat(kafkaContainerHelper.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) .from(output, TIMEOUT)) .hasSize(1) .anySatisfy(kv -> { diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index c1d46256..c85e0870 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -31,13 +31,7 @@ dependencies { testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion) testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion) - val fluentKafkaVersion: String by project testImplementation(project(":streams-bootstrap-test")) - testImplementation( - group = "com.bakdata.fluent-kafka-streams-tests", - name = "schema-registry-mock-junit5", - version = fluentKafkaVersion - ) val testContainersVersion: String by project testFixturesApi(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion) testFixturesApi(group = "org.testcontainers", name = "kafka", version = testContainersVersion) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java index 52eeb988..b31b533f 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java @@ -24,22 +24,28 @@ package com.bakdata.kafka.integration; +import static java.util.Collections.emptyMap; + import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaEndpointConfig; import com.bakdata.kafka.TestUtil; -import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; -import org.junit.jupiter.api.extension.RegisterExtension; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; +import java.util.List; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; @Testcontainers abstract class KafkaTest { - @RegisterExtension - final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); + static final String SCHEMA_REGISTRY_URL = "mock://"; @Container private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster(); + static SchemaRegistryClient getSchemaRegistryClient() { + return SchemaRegistryClientFactory.newClient(List.of(SCHEMA_REGISTRY_URL), 0, null, emptyMap(), null); + } + KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { return KafkaEndpointConfig.builder() .bootstrapServers(this.kafkaCluster.getBootstrapServers()) @@ -49,7 +55,7 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { KafkaEndpointConfig createEndpoint() { return KafkaEndpointConfig.builder() .bootstrapServers(this.kafkaCluster.getBootstrapServers()) - .schemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()) + .schemaRegistryUrl(SCHEMA_REGISTRY_URL) .build(); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java index 5156ea60..0d9ead3d 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java @@ -121,7 +121,7 @@ void shouldDeleteTopic() { void shouldDeleteValueSchema() throws IOException, RestClientException { try (final ConfiguredProducerApp app = createAvroValueApplication(); final ExecutableProducerApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = getSchemaRegistryClient()) { run(executableApp); final String outputTopic = app.getTopics().getOutputTopic(); @@ -137,7 +137,7 @@ void shouldDeleteValueSchema() throws IOException, RestClientException { void shouldDeleteKeySchema() throws IOException, RestClientException { try (final ConfiguredProducerApp app = createAvroKeyApplication(); final ExecutableProducerApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = getSchemaRegistryClient()) { run(executableApp); final String outputTopic = app.getTopics().getOutputTopic(); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 2b1162ed..41ed5c0c 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -283,8 +283,7 @@ void shouldDeleteInternalTopics() throws InterruptedException { .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -336,8 +335,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -443,7 +441,7 @@ void shouldDeleteValueSchema() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorValueApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -452,8 +450,7 @@ void shouldDeleteValueSchema() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( new KeyValue<>(null, testRecord) @@ -478,7 +475,7 @@ void shouldDeleteKeySchema() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -487,8 +484,7 @@ void shouldDeleteKeySchema() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( new KeyValue<>(testRecord, "val") @@ -513,7 +509,7 @@ void shouldDeleteSchemaOfInternalTopics() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -522,8 +518,7 @@ void shouldDeleteSchemaOfInternalTopics() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -556,7 +551,7 @@ void shouldDeleteSchemaOfIntermediateTopics() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -565,8 +560,7 @@ void shouldDeleteSchemaOfIntermediateTopics() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index 909c85b7..3eba804e 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -31,8 +31,8 @@ import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.TestRecord; -import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; @@ -49,7 +49,6 @@ import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; @@ -58,16 +57,19 @@ @Slf4j @ExtendWith(SoftAssertionsExtension.class) class SchemaTopicClientTest { + private static final String SCHEMA_REGISTRY_URL = "mock://"; private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String TOPIC = "topic"; - @RegisterExtension - final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); @Container private final KafkaContainer kafkaCluster = newKafkaCluster(); @InjectSoftAssertions SoftAssertions softly; + private static SchemaRegistryClient getSchemaRegistryClient() { + return SchemaRegistryClientFactory.newClient(List.of(SCHEMA_REGISTRY_URL), 0, null, emptyMap(), null); + } + @Test void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() throws InterruptedException, IOException, RestClientException { @@ -81,13 +83,12 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); + final SchemaRegistryClient client = getSchemaRegistryClient(); this.softly.assertThat(client.getAllSubjects()) .contains(TOPIC + "-value"); @@ -116,13 +117,12 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); + final SchemaRegistryClient client = getSchemaRegistryClient(); this.softly.assertThat(client.getAllSubjects()) .contains(TOPIC + "-value"); @@ -152,13 +152,12 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); + final SchemaRegistryClient client = getSchemaRegistryClient(); this.softly.assertThat(client.getAllSubjects()) .contains(TOPIC + "-value"); @@ -178,8 +177,7 @@ private SchemaTopicClient createClientWithSchemaRegistry() { final Map kafkaProperties = Map.of( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBootstrapServers() ); - return SchemaTopicClient.create(kafkaProperties, this.schemaRegistryMockExtension.getUrl(), - TIMEOUT); + return SchemaTopicClient.create(kafkaProperties, SCHEMA_REGISTRY_URL, TIMEOUT); } private SchemaTopicClient createClientWithNoSchemaRegistry() { From 70aaa1a617c21086939757bd8c409286808eaa12 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 9 Jan 2025 07:15:41 +0100 Subject: [PATCH 2/7] Use Confluent MockSchemaRegistry --- .github/workflows/build-and-publish.yaml | 1 + streams-bootstrap-cli/build.gradle.kts | 4 +-- .../bakdata/kafka/TestTopologyFactory.java | 29 +++++++++++-------- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/.github/workflows/build-and-publish.yaml b/.github/workflows/build-and-publish.yaml index 8074368f..32117779 100644 --- a/.github/workflows/build-and-publish.yaml +++ b/.github/workflows/build-and-publish.yaml @@ -11,6 +11,7 @@ jobs: uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.43.0 with: java-version: 17 + gradle-refresh-dependencies: true # TODO remove secrets: sonar-token: ${{ secrets.SONARCLOUD_TOKEN }} sonar-organization: ${{ secrets.SONARCLOUD_ORGANIZATION }} diff --git a/streams-bootstrap-cli/build.gradle.kts b/streams-bootstrap-cli/build.gradle.kts index 3da100df..7cc8ba99 100644 --- a/streams-bootstrap-cli/build.gradle.kts +++ b/streams-bootstrap-cli/build.gradle.kts @@ -19,8 +19,8 @@ dependencies { testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion) testImplementation(testFixtures(project(":streams-bootstrap-core"))) testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2") - val log4jVersion: String by project - testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) val confluentVersion: String by project testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) + val log4jVersion: String by project + testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) } diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java index c87e8cd7..dc29bdfe 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,11 +24,15 @@ package com.bakdata.kafka; +import static java.util.Collections.emptyMap; + import com.bakdata.fluent_kafka_streams_tests.TestTopology; import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; import com.bakdata.kafka.KafkaEndpointConfig.KafkaEndpointConfigBuilder; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; +import java.util.List; import java.util.Map; -import java.util.function.Function; import lombok.experimental.UtilityClass; /** @@ -37,6 +41,12 @@ @UtilityClass public class TestTopologyFactory { + public static final String SCHEMA_REGISTRY_URL = "mock://"; + + public static SchemaRegistryClient getSchemaRegistryClient() { + return SchemaRegistryClientFactory.newClient(List.of(SCHEMA_REGISTRY_URL), 0, null, emptyMap(), null); + } + /** * Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects are {@link KafkaEndpointConfig} * with configured Schema Registry. @@ -110,9 +120,12 @@ public static TestTopologyExtension createTopologyExtension( * @return Kafka properties * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) */ - public static Function> getKafkaPropertiesWithSchemaRegistryUrl( + public static Map getKafkaPropertiesWithSchemaRegistryUrl( final ConfiguredStreamsApp app) { - return schemaRegistryUrl -> getKafkaProperties(app, schemaRegistryUrl); + final KafkaEndpointConfig endpointConfig = newEndpointConfig() + .schemaRegistryUrl(SCHEMA_REGISTRY_URL) + .build(); + return app.getKafkaProperties(endpointConfig); } /** @@ -126,14 +139,6 @@ public static Configurator createConfigurator(final TestTopology testTopol return new Configurator(testTopology.getProperties()); } - private static Map getKafkaProperties(final ConfiguredStreamsApp app, - final String schemaRegistryUrl) { - final KafkaEndpointConfig endpointConfig = newEndpointConfig() - .schemaRegistryUrl(schemaRegistryUrl) - .build(); - return app.getKafkaProperties(endpointConfig); - } - private static Map getKafkaProperties(final ConfiguredStreamsApp app) { final KafkaEndpointConfig endpointConfig = newEndpointConfig() .build(); From e036f9d87e8b99dd1e0f2b89b0ec84d7eb44fefc Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 9 Jan 2025 07:45:52 +0100 Subject: [PATCH 3/7] Use Confluent MockSchemaRegistry --- .../bakdata/kafka/integration/KafkaTest.java | 13 +---- .../ProducerCleanUpRunnerTest.java | 1 + .../integration/StreamsCleanUpRunnerTest.java | 55 ++++++++++--------- .../kafka/util/SchemaTopicClientTest.java | 8 +-- 4 files changed, 34 insertions(+), 43 deletions(-) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java index b31b533f..89ab552c 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java @@ -24,28 +24,19 @@ package com.bakdata.kafka.integration; -import static java.util.Collections.emptyMap; - import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaEndpointConfig; +import com.bakdata.kafka.TestTopologyFactory; import com.bakdata.kafka.TestUtil; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; -import java.util.List; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; @Testcontainers abstract class KafkaTest { - static final String SCHEMA_REGISTRY_URL = "mock://"; @Container private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster(); - static SchemaRegistryClient getSchemaRegistryClient() { - return SchemaRegistryClientFactory.newClient(List.of(SCHEMA_REGISTRY_URL), 0, null, emptyMap(), null); - } - KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { return KafkaEndpointConfig.builder() .bootstrapServers(this.kafkaCluster.getBootstrapServers()) @@ -55,7 +46,7 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { KafkaEndpointConfig createEndpoint() { return KafkaEndpointConfig.builder() .bootstrapServers(this.kafkaCluster.getBootstrapServers()) - .schemaRegistryUrl(SCHEMA_REGISTRY_URL) + .schemaRegistryUrl(TestTopologyFactory.SCHEMA_REGISTRY_URL) .build(); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java index 0d9ead3d..24429676 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java @@ -25,6 +25,7 @@ package com.bakdata.kafka.integration; +import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient; import static com.bakdata.kafka.integration.ProducerRunnerTest.configureApp; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 41ed5c0c..0f5864d2 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -26,6 +26,8 @@ import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; +import static com.bakdata.kafka.TestTopologyFactory.SCHEMA_REGISTRY_URL; +import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient; import static com.bakdata.kafka.integration.StreamsRunnerTest.configureApp; import static java.util.Collections.emptyMap; import static org.mockito.Mockito.verify; @@ -474,33 +476,34 @@ void shouldDeleteValueSchema() void shouldDeleteKeySchema() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = getSchemaRegistryClient()) { - final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); - final String inputTopic = app.getTopics().getInputTopics().get(0); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { + try (final SchemaRegistryClient client = getSchemaRegistryClient()) { + final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); + final String inputTopic = app.getTopics().getInputTopics().get(0); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>(testRecord, "val") + )); + + run(executableApp); + + // Wait until all stream application are completely stopped before triggering cleanup + Thread.sleep(TIMEOUT.toMillis()); + final String outputTopic = app.getTopics().getOutputTopic(); + this.softly.assertThat(client.getAllSubjects()) + .contains(outputTopic + "-key", inputTopic + "-key"); + clean(executableApp); + this.softly.assertThat(client.getAllSubjects()) + .doesNotContain(outputTopic + "-key") + .contains(inputTopic + "-key"); } - kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) - .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) - .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(testRecord, "val") - )); - - run(executableApp); - - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); - final String outputTopic = app.getTopics().getOutputTopic(); - this.softly.assertThat(client.getAllSubjects()) - .contains(outputTopic + "-key", inputTopic + "-key"); - clean(executableApp); - this.softly.assertThat(client.getAllSubjects()) - .doesNotContain(outputTopic + "-key") - .contains(inputTopic + "-key"); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index 3eba804e..2e1b5c26 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -26,13 +26,14 @@ import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; +import static com.bakdata.kafka.TestTopologyFactory.SCHEMA_REGISTRY_URL; +import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient; import static com.bakdata.kafka.TestUtil.newKafkaCluster; import static java.util.Collections.emptyMap; import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.TestRecord; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; @@ -57,7 +58,6 @@ @Slf4j @ExtendWith(SoftAssertionsExtension.class) class SchemaTopicClientTest { - private static final String SCHEMA_REGISTRY_URL = "mock://"; private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String TOPIC = "topic"; @Container @@ -66,10 +66,6 @@ class SchemaTopicClientTest { @InjectSoftAssertions SoftAssertions softly; - private static SchemaRegistryClient getSchemaRegistryClient() { - return SchemaRegistryClientFactory.newClient(List.of(SCHEMA_REGISTRY_URL), 0, null, emptyMap(), null); - } - @Test void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() throws InterruptedException, IOException, RestClientException { From 8416e8d85abe507fbd31083acccd2a6d4a2ff6f6 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 9 Jan 2025 07:48:18 +0100 Subject: [PATCH 4/7] Use Confluent MockSchemaRegistry --- .../integration/StreamsCleanUpRunnerTest.java | 53 +++++++++---------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 0f5864d2..8a43a7df 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -476,34 +476,33 @@ void shouldDeleteValueSchema() void shouldDeleteKeySchema() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { - try (final SchemaRegistryClient client = getSchemaRegistryClient()) { - final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); - final String inputTopic = app.getTopics().getInputTopics().get(0); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) - .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) - .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(testRecord, "val") - )); - - run(executableApp); - - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); - final String outputTopic = app.getTopics().getOutputTopic(); - this.softly.assertThat(client.getAllSubjects()) - .contains(outputTopic + "-key", inputTopic + "-key"); - clean(executableApp); - this.softly.assertThat(client.getAllSubjects()) - .doesNotContain(outputTopic + "-key") - .contains(inputTopic + "-key"); + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); + final SchemaRegistryClient client = getSchemaRegistryClient()) { + final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); + final String inputTopic = app.getTopics().getInputTopics().get(0); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } + kafkaContainerHelper.send() + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>(testRecord, "val") + )); + + run(executableApp); + + // Wait until all stream application are completely stopped before triggering cleanup + Thread.sleep(TIMEOUT.toMillis()); + final String outputTopic = app.getTopics().getOutputTopic(); + this.softly.assertThat(client.getAllSubjects()) + .contains(outputTopic + "-key", inputTopic + "-key"); + clean(executableApp); + this.softly.assertThat(client.getAllSubjects()) + .doesNotContain(outputTopic + "-key") + .contains(inputTopic + "-key"); } } From c1bc5c5d5fc642e0d5dc67a415a2fd55124cc9e9 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 9 Jan 2025 09:19:24 +0100 Subject: [PATCH 5/7] Use Confluent MockSchemaRegistry --- .../com/bakdata/kafka/AvroMirrorTest.java | 4 +- .../bakdata/kafka/integration/KafkaTest.java | 12 +- .../ProducerCleanUpRunnerTest.java | 1 - .../integration/StreamsCleanUpRunnerTest.java | 14 +- .../kafka/util/SchemaTopicClientTest.java | 20 ++- .../bakdata/kafka/TestTopologyFactory.java | 121 ++++++++---------- 6 files changed, 83 insertions(+), 89 deletions(-) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java index 1f3d1c6d..553fde3b 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -35,7 +35,7 @@ class AvroMirrorTest { private final ConfiguredStreamsApp app = createApp(); @RegisterExtension final TestTopologyExtension testTopology = - TestTopologyFactory.createTopologyExtensionWithSchemaRegistry(this.app); + TestTopologyFactory.withSchemaRegistry().createTopologyExtension(this.app); private static ConfiguredStreamsApp createApp() { final AppConfiguration configuration = new AppConfiguration<>(StreamsTopicConfig.builder() diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java index 89ab552c..325a7d6f 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java @@ -28,15 +28,25 @@ import com.bakdata.kafka.KafkaEndpointConfig; import com.bakdata.kafka.TestTopologyFactory; import com.bakdata.kafka.TestUtil; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; @Testcontainers abstract class KafkaTest { + private static final TestTopologyFactory TEST_TOPOLOGY_FACTORY = TestTopologyFactory.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster(); + static String getSchemaRegistryUrl() { + return TEST_TOPOLOGY_FACTORY.getSchemaRegistryUrl(); + } + + static SchemaRegistryClient getSchemaRegistryClient() { + return TEST_TOPOLOGY_FACTORY.getSchemaRegistryClient(); + } + KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { return KafkaEndpointConfig.builder() .bootstrapServers(this.kafkaCluster.getBootstrapServers()) @@ -46,7 +56,7 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { KafkaEndpointConfig createEndpoint() { return KafkaEndpointConfig.builder() .bootstrapServers(this.kafkaCluster.getBootstrapServers()) - .schemaRegistryUrl(TestTopologyFactory.SCHEMA_REGISTRY_URL) + .schemaRegistryUrl(getSchemaRegistryUrl()) .build(); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java index 24429676..0d9ead3d 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java @@ -25,7 +25,6 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient; import static com.bakdata.kafka.integration.ProducerRunnerTest.configureApp; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 8a43a7df..78421620 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -26,8 +26,6 @@ import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestTopologyFactory.SCHEMA_REGISTRY_URL; -import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient; import static com.bakdata.kafka.integration.StreamsRunnerTest.configureApp; import static java.util.Collections.emptyMap; import static org.mockito.Mockito.verify; @@ -285,7 +283,7 @@ void shouldDeleteInternalTopics() throws InterruptedException { .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -337,7 +335,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -452,7 +450,7 @@ void shouldDeleteValueSchema() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( new KeyValue<>(null, testRecord) @@ -486,7 +484,7 @@ void shouldDeleteKeySchema() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( new KeyValue<>(testRecord, "val") @@ -520,7 +518,7 @@ void shouldDeleteSchemaOfInternalTopics() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -562,7 +560,7 @@ void shouldDeleteSchemaOfIntermediateTopics() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index 2e1b5c26..bbf08fd0 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -26,13 +26,12 @@ import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestTopologyFactory.SCHEMA_REGISTRY_URL; -import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient; import static com.bakdata.kafka.TestUtil.newKafkaCluster; import static java.util.Collections.emptyMap; import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.TestRecord; +import com.bakdata.kafka.TestTopologyFactory; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; @@ -60,12 +59,21 @@ class SchemaTopicClientTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String TOPIC = "topic"; + private static final TestTopologyFactory TEST_TOPOLOGY_FACTORY = TestTopologyFactory.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = newKafkaCluster(); @InjectSoftAssertions SoftAssertions softly; + private static String getSchemaRegistryUrl() { + return TEST_TOPOLOGY_FACTORY.getSchemaRegistryUrl(); + } + + private static SchemaRegistryClient getSchemaRegistryClient() { + return TEST_TOPOLOGY_FACTORY.getSchemaRegistryClient(); + } + @Test void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() throws InterruptedException, IOException, RestClientException { @@ -79,7 +87,7 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); @@ -113,7 +121,7 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); @@ -148,7 +156,7 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); @@ -173,7 +181,7 @@ private SchemaTopicClient createClientWithSchemaRegistry() { final Map kafkaProperties = Map.of( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBootstrapServers() ); - return SchemaTopicClient.create(kafkaProperties, SCHEMA_REGISTRY_URL, TIMEOUT); + return SchemaTopicClient.create(kafkaProperties, getSchemaRegistryUrl(), TIMEOUT); } private SchemaTopicClient createClientWithNoSchemaRegistry() { diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java index dc29bdfe..8c416ac4 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java @@ -28,61 +28,62 @@ import com.bakdata.fluent_kafka_streams_tests.TestTopology; import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; -import com.bakdata.kafka.KafkaEndpointConfig.KafkaEndpointConfigBuilder; +import io.confluent.kafka.schemaregistry.SchemaProvider; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; import java.util.List; import java.util.Map; -import lombok.experimental.UtilityClass; +import java.util.Objects; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; /** - * Utility class that provides helpers for using Fluent Kafka Streams Tests with {@link ConfiguredStreamsApp} + * Class that provides helpers for using Fluent Kafka Streams Tests with {@link ConfiguredStreamsApp} */ -@UtilityClass -public class TestTopologyFactory { +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public final class TestTopologyFactory { - public static final String SCHEMA_REGISTRY_URL = "mock://"; + private static final String DEFAULT_SCHEMA_REGISTRY_URL = "mock://"; + private final String schemaRegistryUrl; - public static SchemaRegistryClient getSchemaRegistryClient() { - return SchemaRegistryClientFactory.newClient(List.of(SCHEMA_REGISTRY_URL), 0, null, emptyMap(), null); + public static TestTopologyFactory withoutSchemaRegistry() { + return withSchemaRegistry(null); } - /** - * Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects are {@link KafkaEndpointConfig} - * with configured Schema Registry. - * - * @param app ConfiguredStreamsApp to create TestTopology from - * @param Default type of keys - * @param Default type of values - * @return {@code TestTopology} that uses topology and configuration provided by {@code ConfiguredStreamsApp} - * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) - * @see ConfiguredStreamsApp#createTopology(Map) - */ - public static TestTopology createTopologyWithSchemaRegistry( - final ConfiguredStreamsApp app) { - return new TestTopology<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app)); + public static TestTopologyFactory withSchemaRegistry() { + return withSchemaRegistry(DEFAULT_SCHEMA_REGISTRY_URL); + } + + public static TestTopologyFactory withSchemaRegistry(final String schemaRegistryUrl) { + return new TestTopologyFactory(schemaRegistryUrl); } /** - * Create a {@code TestTopologyExtension} from a {@code ConfiguredStreamsApp}. It injects are - * {@link KafkaEndpointConfig} with configured Schema Registry. - * - * @param app ConfiguredStreamsApp to create TestTopology from - * @param Default type of keys - * @param Default type of values - * @return {@code TestTopologyExtension} that uses topology and configuration provided by {@code - * ConfiguredStreamsApp} - * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) - * @see ConfiguredStreamsApp#createTopology(Map) + * Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and + * {@link org.apache.kafka.common.serialization.Serializer} using the {@code TestTopology} properties. + * @param testTopology {@code TestTopology} to use properties of + * @return {@code Configurator} + * @see TestTopology#getProperties() */ - public static TestTopologyExtension createTopologyExtensionWithSchemaRegistry( - final ConfiguredStreamsApp app) { - return new TestTopologyExtension<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app)); + public static Configurator createConfigurator(final TestTopology testTopology) { + return new Configurator(testTopology.getProperties()); + } + + public String getSchemaRegistryUrl() { + return Objects.requireNonNull(this.schemaRegistryUrl); + } + + public SchemaRegistryClient getSchemaRegistryClient() { + return this.getSchemaRegistryClient(null); + } + + public SchemaRegistryClient getSchemaRegistryClient(final List providers) { + return SchemaRegistryClientFactory.newClient(List.of(this.schemaRegistryUrl), 0, providers, emptyMap(), null); } /** - * Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects are {@link KafkaEndpointConfig} - * without configured Schema Registry. + * Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects a {@link KafkaEndpointConfig} + * for test purposes with Schema Registry optionally configured. * * @param app ConfiguredStreamsApp to create TestTopology from * @param Default type of keys @@ -91,13 +92,13 @@ public static TestTopologyExtension createTopologyExtensionWithSche * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) * @see ConfiguredStreamsApp#createTopology(Map) */ - public static TestTopology createTopology(final ConfiguredStreamsApp app) { - return new TestTopology<>(app::createTopology, getKafkaProperties(app)); + public TestTopology createTopology(final ConfiguredStreamsApp app) { + return new TestTopology<>(app::createTopology, this.getKafkaProperties(app)); } /** - * Create a {@code TestTopologyExtension} from a {@code ConfiguredStreamsApp}. It injects are - * {@link KafkaEndpointConfig} without configured Schema Registry. + * Create a {@code TestTopologyExtension} from a {@code ConfiguredStreamsApp}. It injects a + * {@link KafkaEndpointConfig} for test purposes with Schema Registry optionally configured. * * @param app ConfiguredStreamsApp to create TestTopology from * @param Default type of keys @@ -107,46 +108,24 @@ public static TestTopology createTopology(final ConfiguredStreamsAp * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) * @see ConfiguredStreamsApp#createTopology(Map) */ - public static TestTopologyExtension createTopologyExtension( + public TestTopologyExtension createTopologyExtension( final ConfiguredStreamsApp app) { - return new TestTopologyExtension<>(app::createTopology, getKafkaProperties(app)); + return new TestTopologyExtension<>(app::createTopology, this.getKafkaProperties(app)); } /** - * Get Kafka properties from a {@code ConfiguredStreamsApp} after using a {@link KafkaEndpointConfig} with - * configured Schema Registry. + * Get Kafka properties from a {@code ConfiguredStreamsApp} using a {@link KafkaEndpointConfig} for test purposes + * with Schema Registry optionally configured. * * @param app ConfiguredStreamsApp to get Kafka properties of * @return Kafka properties * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) */ - public static Map getKafkaPropertiesWithSchemaRegistryUrl( - final ConfiguredStreamsApp app) { - final KafkaEndpointConfig endpointConfig = newEndpointConfig() - .schemaRegistryUrl(SCHEMA_REGISTRY_URL) + public Map getKafkaProperties(final ConfiguredStreamsApp app) { + final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() + .bootstrapServers("localhost:9092") + .schemaRegistryUrl(this.schemaRegistryUrl) .build(); return app.getKafkaProperties(endpointConfig); } - - /** - * Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and - * {@link org.apache.kafka.common.serialization.Serializer} using the {@code TestTopology} properties. - * @param testTopology {@code TestTopology} to use properties of - * @return {@code Configurator} - * @see TestTopology#getProperties() - */ - public static Configurator createConfigurator(final TestTopology testTopology) { - return new Configurator(testTopology.getProperties()); - } - - private static Map getKafkaProperties(final ConfiguredStreamsApp app) { - final KafkaEndpointConfig endpointConfig = newEndpointConfig() - .build(); - return app.getKafkaProperties(endpointConfig); - } - - private static KafkaEndpointConfigBuilder newEndpointConfig() { - return KafkaEndpointConfig.builder() - .bootstrapServers("localhost:9092"); - } } From fd02b5e809f7592bef9e2943922a825d0279ef56 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 9 Jan 2025 09:55:05 +0100 Subject: [PATCH 6/7] Use Confluent MockSchemaRegistry --- .../bakdata/kafka/integration/KafkaTest.java | 20 ++++----- .../ProducerCleanUpRunnerTest.java | 4 +- .../integration/StreamsCleanUpRunnerTest.java | 20 ++++----- .../kafka/util/SchemaTopicClientTest.java | 32 +++++++-------- .../bakdata/kafka/TestTopologyFactory.java | 41 +++++++++++++++++-- 5 files changed, 75 insertions(+), 42 deletions(-) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java index 325a7d6f..f04f2001 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java @@ -35,18 +35,10 @@ @Testcontainers abstract class KafkaTest { - private static final TestTopologyFactory TEST_TOPOLOGY_FACTORY = TestTopologyFactory.withSchemaRegistry(); + private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster(); - static String getSchemaRegistryUrl() { - return TEST_TOPOLOGY_FACTORY.getSchemaRegistryUrl(); - } - - static SchemaRegistryClient getSchemaRegistryClient() { - return TEST_TOPOLOGY_FACTORY.getSchemaRegistryClient(); - } - KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { return KafkaEndpointConfig.builder() .bootstrapServers(this.kafkaCluster.getBootstrapServers()) @@ -56,11 +48,19 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { KafkaEndpointConfig createEndpoint() { return KafkaEndpointConfig.builder() .bootstrapServers(this.kafkaCluster.getBootstrapServers()) - .schemaRegistryUrl(getSchemaRegistryUrl()) + .schemaRegistryUrl(this.getSchemaRegistryUrl()) .build(); } KafkaContainerHelper newContainerHelper() { return new KafkaContainerHelper(this.kafkaCluster); } + + String getSchemaRegistryUrl() { + return this.testTopologyFactory.getSchemaRegistryUrl(); + } + + SchemaRegistryClient getSchemaRegistryClient() { + return this.testTopologyFactory.getSchemaRegistryClient(); + } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java index 0d9ead3d..155890b0 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java @@ -121,7 +121,7 @@ void shouldDeleteTopic() { void shouldDeleteValueSchema() throws IOException, RestClientException { try (final ConfiguredProducerApp app = createAvroValueApplication(); final ExecutableProducerApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { run(executableApp); final String outputTopic = app.getTopics().getOutputTopic(); @@ -137,7 +137,7 @@ void shouldDeleteValueSchema() throws IOException, RestClientException { void shouldDeleteKeySchema() throws IOException, RestClientException { try (final ConfiguredProducerApp app = createAvroKeyApplication(); final ExecutableProducerApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { run(executableApp); final String outputTopic = app.getTopics().getOutputTopic(); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 78421620..5e8cfcea 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -283,7 +283,7 @@ void shouldDeleteInternalTopics() throws InterruptedException { .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -335,7 +335,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -441,7 +441,7 @@ void shouldDeleteValueSchema() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorValueApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -450,7 +450,7 @@ void shouldDeleteValueSchema() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( new KeyValue<>(null, testRecord) @@ -475,7 +475,7 @@ void shouldDeleteKeySchema() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -484,7 +484,7 @@ void shouldDeleteKeySchema() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( new KeyValue<>(testRecord, "val") @@ -509,7 +509,7 @@ void shouldDeleteSchemaOfInternalTopics() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -518,7 +518,7 @@ void shouldDeleteSchemaOfInternalTopics() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -551,7 +551,7 @@ void shouldDeleteSchemaOfIntermediateTopics() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -560,7 +560,7 @@ void shouldDeleteSchemaOfIntermediateTopics() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index bbf08fd0..2775ef33 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -59,21 +59,13 @@ class SchemaTopicClientTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String TOPIC = "topic"; - private static final TestTopologyFactory TEST_TOPOLOGY_FACTORY = TestTopologyFactory.withSchemaRegistry(); + private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = newKafkaCluster(); @InjectSoftAssertions SoftAssertions softly; - private static String getSchemaRegistryUrl() { - return TEST_TOPOLOGY_FACTORY.getSchemaRegistryUrl(); - } - - private static SchemaRegistryClient getSchemaRegistryClient() { - return TEST_TOPOLOGY_FACTORY.getSchemaRegistryClient(); - } - @Test void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() throws InterruptedException, IOException, RestClientException { @@ -87,12 +79,12 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); - final SchemaRegistryClient client = getSchemaRegistryClient(); + final SchemaRegistryClient client = this.getSchemaRegistryClient(); this.softly.assertThat(client.getAllSubjects()) .contains(TOPIC + "-value"); @@ -121,12 +113,12 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); - final SchemaRegistryClient client = getSchemaRegistryClient(); + final SchemaRegistryClient client = this.getSchemaRegistryClient(); this.softly.assertThat(client.getAllSubjects()) .contains(TOPIC + "-value"); @@ -156,12 +148,12 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); - final SchemaRegistryClient client = getSchemaRegistryClient(); + final SchemaRegistryClient client = this.getSchemaRegistryClient(); this.softly.assertThat(client.getAllSubjects()) .contains(TOPIC + "-value"); @@ -181,7 +173,7 @@ private SchemaTopicClient createClientWithSchemaRegistry() { final Map kafkaProperties = Map.of( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBootstrapServers() ); - return SchemaTopicClient.create(kafkaProperties, getSchemaRegistryUrl(), TIMEOUT); + return SchemaTopicClient.create(kafkaProperties, this.getSchemaRegistryUrl(), TIMEOUT); } private SchemaTopicClient createClientWithNoSchemaRegistry() { @@ -191,4 +183,12 @@ private SchemaTopicClient createClientWithNoSchemaRegistry() { return SchemaTopicClient.create(kafkaProperties, TIMEOUT); } + private String getSchemaRegistryUrl() { + return this.testTopologyFactory.getSchemaRegistryUrl(); + } + + private SchemaRegistryClient getSchemaRegistryClient() { + return this.testTopologyFactory.getSchemaRegistryClient(); + } + } diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java index 8c416ac4..2c6680b9 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; @@ -43,17 +44,32 @@ @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public final class TestTopologyFactory { - private static final String DEFAULT_SCHEMA_REGISTRY_URL = "mock://"; + private static final String MOCK_URL_PREFIX = "mock://"; private final String schemaRegistryUrl; + /** + * Create a new {@code TestTopologyFactory} with no configured Schema Registry. + * @return {@code TestTopologyFactory} with no configured Schema Registry + */ public static TestTopologyFactory withoutSchemaRegistry() { return withSchemaRegistry(null); } + /** + * Create a new {@code TestTopologyFactory} with configured + * {@link io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry}. The scope is random in order to avoid + * collisions between different test instances as scopes are retained globally. + * @return {@code TestTopologyFactory} with configured Schema Registry + */ public static TestTopologyFactory withSchemaRegistry() { - return withSchemaRegistry(DEFAULT_SCHEMA_REGISTRY_URL); + return withSchemaRegistry(MOCK_URL_PREFIX + UUID.randomUUID()); } + /** + * Create a new {@code TestTopologyFactory} with configured Schema Registry. + * @param schemaRegistryUrl Schema Registry URL to use + * @return {@code TestTopologyFactory} with configured Schema Registry + */ public static TestTopologyFactory withSchemaRegistry(final String schemaRegistryUrl) { return new TestTopologyFactory(schemaRegistryUrl); } @@ -69,16 +85,33 @@ public static Configurator createConfigurator(final TestTopology testTopol return new Configurator(testTopology.getProperties()); } + /** + * Get Schema Registry URL if configured + * @return Schema Registry URL + * @throws NullPointerException if Schema Registry is not configured + */ public String getSchemaRegistryUrl() { - return Objects.requireNonNull(this.schemaRegistryUrl); + return Objects.requireNonNull(this.schemaRegistryUrl, "Schema Registry is not configured"); } + /** + * Get {@code SchemaRegistryClient} for configured URL with default providers + * @return {@code SchemaRegistryClient} + * @throws NullPointerException if Schema Registry is not configured + */ public SchemaRegistryClient getSchemaRegistryClient() { return this.getSchemaRegistryClient(null); } + /** + * Get {@code SchemaRegistryClient} for configured URL + * @param providers list of {@code SchemaProvider} to use for {@code SchemaRegistryClient} + * @return {@code SchemaRegistryClient} + * @throws NullPointerException if Schema Registry is not configured + */ public SchemaRegistryClient getSchemaRegistryClient(final List providers) { - return SchemaRegistryClientFactory.newClient(List.of(this.schemaRegistryUrl), 0, providers, emptyMap(), null); + return SchemaRegistryClientFactory.newClient(List.of(this.getSchemaRegistryUrl()), 0, providers, emptyMap(), + null); } /** From 392ae705b4b1eb8fb1d912b4dd5742083c5cf3b6 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 9 Jan 2025 11:48:27 +0100 Subject: [PATCH 7/7] Update --- .github/workflows/build-and-publish.yaml | 1 - gradle.properties | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/build-and-publish.yaml b/.github/workflows/build-and-publish.yaml index 32117779..8074368f 100644 --- a/.github/workflows/build-and-publish.yaml +++ b/.github/workflows/build-and-publish.yaml @@ -11,7 +11,6 @@ jobs: uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.43.0 with: java-version: 17 - gradle-refresh-dependencies: true # TODO remove secrets: sonar-token: ${{ secrets.SONARCLOUD_TOKEN }} sonar-organization: ${{ secrets.SONARCLOUD_ORGANIZATION }} diff --git a/gradle.properties b/gradle.properties index 4f340d48..a45157a2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,7 +5,7 @@ org.gradle.parallel=false kafkaVersion=3.8.1 testContainersVersion=1.20.4 confluentVersion=7.8.0 -fluentKafkaVersion=2.16.1-SNAPSHOT +fluentKafkaVersion=3.0.0 junitVersion=5.11.4 mockitoVersion=5.15.2 assertJVersion=3.27.2