Skip to content

Commit

Permalink
Use Confluent MockSchemaRegistry
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 9, 2025
1 parent 8416e8d commit c1bc5c5
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -35,7 +35,7 @@ class AvroMirrorTest {
private final ConfiguredStreamsApp<MirrorWithNonDefaultSerde> app = createApp();
@RegisterExtension
final TestTopologyExtension<TestRecord, TestRecord> testTopology =
TestTopologyFactory.createTopologyExtensionWithSchemaRegistry(this.app);
TestTopologyFactory.withSchemaRegistry().createTopologyExtension(this.app);

private static ConfiguredStreamsApp<MirrorWithNonDefaultSerde> createApp() {
final AppConfiguration<StreamsTopicConfig> configuration = new AppConfiguration<>(StreamsTopicConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,25 @@
import com.bakdata.kafka.KafkaEndpointConfig;
import com.bakdata.kafka.TestTopologyFactory;
import com.bakdata.kafka.TestUtil;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
abstract class KafkaTest {
private static final TestTopologyFactory TEST_TOPOLOGY_FACTORY = TestTopologyFactory.withSchemaRegistry();
@Container
private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster();

static String getSchemaRegistryUrl() {
return TEST_TOPOLOGY_FACTORY.getSchemaRegistryUrl();
}

static SchemaRegistryClient getSchemaRegistryClient() {
return TEST_TOPOLOGY_FACTORY.getSchemaRegistryClient();
}

KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
return KafkaEndpointConfig.builder()
.bootstrapServers(this.kafkaCluster.getBootstrapServers())
Expand All @@ -46,7 +56,7 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
KafkaEndpointConfig createEndpoint() {
return KafkaEndpointConfig.builder()
.bootstrapServers(this.kafkaCluster.getBootstrapServers())
.schemaRegistryUrl(TestTopologyFactory.SCHEMA_REGISTRY_URL)
.schemaRegistryUrl(getSchemaRegistryUrl())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.bakdata.kafka.integration;


import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient;
import static com.bakdata.kafka.integration.ProducerRunnerTest.configureApp;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@


import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS;
import static com.bakdata.kafka.TestTopologyFactory.SCHEMA_REGISTRY_URL;
import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient;
import static com.bakdata.kafka.integration.StreamsRunnerTest.configureApp;
import static java.util.Collections.emptyMap;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -285,7 +283,7 @@ void shouldDeleteInternalTopics() throws InterruptedException {
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -337,7 +335,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException {
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -452,7 +450,7 @@ void shouldDeleteValueSchema()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new KeyValue<>(null, testRecord)
Expand Down Expand Up @@ -486,7 +484,7 @@ void shouldDeleteKeySchema()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new KeyValue<>(testRecord, "val")
Expand Down Expand Up @@ -520,7 +518,7 @@ void shouldDeleteSchemaOfInternalTopics()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -562,7 +560,7 @@ void shouldDeleteSchemaOfIntermediateTopics()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@


import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS;
import static com.bakdata.kafka.TestTopologyFactory.SCHEMA_REGISTRY_URL;
import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient;
import static com.bakdata.kafka.TestUtil.newKafkaCluster;
import static java.util.Collections.emptyMap;

import com.bakdata.kafka.KafkaContainerHelper;
import com.bakdata.kafka.TestRecord;
import com.bakdata.kafka.TestTopologyFactory;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
Expand Down Expand Up @@ -60,12 +59,21 @@
class SchemaTopicClientTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String TOPIC = "topic";
private static final TestTopologyFactory TEST_TOPOLOGY_FACTORY = TestTopologyFactory.withSchemaRegistry();
@Container
private final KafkaContainer kafkaCluster = newKafkaCluster();

@InjectSoftAssertions
SoftAssertions softly;

private static String getSchemaRegistryUrl() {
return TEST_TOPOLOGY_FACTORY.getSchemaRegistryUrl();
}

private static SchemaRegistryClient getSchemaRegistryClient() {
return TEST_TOPOLOGY_FACTORY.getSchemaRegistryClient();
}

@Test
void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet()
throws InterruptedException, IOException, RestClientException {
Expand All @@ -79,7 +87,7 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet()

kafkaContainerHelper.send()
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
.to(TOPIC, List.of(
new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build())
));
Expand Down Expand Up @@ -113,7 +121,7 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc

kafkaContainerHelper.send()
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
.to(TOPIC, List.of(
new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build())
));
Expand Down Expand Up @@ -148,7 +156,7 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr

kafkaContainerHelper.send()
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
.to(TOPIC, List.of(
new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build())
));
Expand All @@ -173,7 +181,7 @@ private SchemaTopicClient createClientWithSchemaRegistry() {
final Map<String, Object> kafkaProperties = Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBootstrapServers()
);
return SchemaTopicClient.create(kafkaProperties, SCHEMA_REGISTRY_URL, TIMEOUT);
return SchemaTopicClient.create(kafkaProperties, getSchemaRegistryUrl(), TIMEOUT);
}

private SchemaTopicClient createClientWithNoSchemaRegistry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,61 +28,62 @@

import com.bakdata.fluent_kafka_streams_tests.TestTopology;
import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension;
import com.bakdata.kafka.KafkaEndpointConfig.KafkaEndpointConfigBuilder;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory;
import java.util.List;
import java.util.Map;
import lombok.experimental.UtilityClass;
import java.util.Objects;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;

/**
* Utility class that provides helpers for using Fluent Kafka Streams Tests with {@link ConfiguredStreamsApp}
* Class that provides helpers for using Fluent Kafka Streams Tests with {@link ConfiguredStreamsApp}
*/
@UtilityClass
public class TestTopologyFactory {
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class TestTopologyFactory {

public static final String SCHEMA_REGISTRY_URL = "mock://";
private static final String DEFAULT_SCHEMA_REGISTRY_URL = "mock://";
private final String schemaRegistryUrl;

public static SchemaRegistryClient getSchemaRegistryClient() {
return SchemaRegistryClientFactory.newClient(List.of(SCHEMA_REGISTRY_URL), 0, null, emptyMap(), null);
public static TestTopologyFactory withoutSchemaRegistry() {
return withSchemaRegistry(null);
}

/**
* Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects are {@link KafkaEndpointConfig}
* with configured Schema Registry.
*
* @param app ConfiguredStreamsApp to create TestTopology from
* @param <K> Default type of keys
* @param <V> Default type of values
* @return {@code TestTopology} that uses topology and configuration provided by {@code ConfiguredStreamsApp}
* @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig)
* @see ConfiguredStreamsApp#createTopology(Map)
*/
public static <K, V> TestTopology<K, V> createTopologyWithSchemaRegistry(
final ConfiguredStreamsApp<? extends StreamsApp> app) {
return new TestTopology<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app));
public static TestTopologyFactory withSchemaRegistry() {
return withSchemaRegistry(DEFAULT_SCHEMA_REGISTRY_URL);
}

public static TestTopologyFactory withSchemaRegistry(final String schemaRegistryUrl) {
return new TestTopologyFactory(schemaRegistryUrl);
}

/**
* Create a {@code TestTopologyExtension} from a {@code ConfiguredStreamsApp}. It injects are
* {@link KafkaEndpointConfig} with configured Schema Registry.
*
* @param app ConfiguredStreamsApp to create TestTopology from
* @param <K> Default type of keys
* @param <V> Default type of values
* @return {@code TestTopologyExtension} that uses topology and configuration provided by {@code
* ConfiguredStreamsApp}
* @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig)
* @see ConfiguredStreamsApp#createTopology(Map)
* Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and
* {@link org.apache.kafka.common.serialization.Serializer} using the {@code TestTopology} properties.
* @param testTopology {@code TestTopology} to use properties of
* @return {@code Configurator}
* @see TestTopology#getProperties()
*/
public static <K, V> TestTopologyExtension<K, V> createTopologyExtensionWithSchemaRegistry(
final ConfiguredStreamsApp<? extends StreamsApp> app) {
return new TestTopologyExtension<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app));
public static Configurator createConfigurator(final TestTopology<?, ?> testTopology) {
return new Configurator(testTopology.getProperties());
}

public String getSchemaRegistryUrl() {
return Objects.requireNonNull(this.schemaRegistryUrl);
}

public SchemaRegistryClient getSchemaRegistryClient() {
return this.getSchemaRegistryClient(null);
}

public SchemaRegistryClient getSchemaRegistryClient(final List<SchemaProvider> providers) {
return SchemaRegistryClientFactory.newClient(List.of(this.schemaRegistryUrl), 0, providers, emptyMap(), null);
}

/**
* Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects are {@link KafkaEndpointConfig}
* without configured Schema Registry.
* Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects a {@link KafkaEndpointConfig}
* for test purposes with Schema Registry optionally configured.
*
* @param app ConfiguredStreamsApp to create TestTopology from
* @param <K> Default type of keys
Expand All @@ -91,13 +92,13 @@ public static <K, V> TestTopologyExtension<K, V> createTopologyExtensionWithSche
* @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig)
* @see ConfiguredStreamsApp#createTopology(Map)
*/
public static <K, V> TestTopology<K, V> createTopology(final ConfiguredStreamsApp<? extends StreamsApp> app) {
return new TestTopology<>(app::createTopology, getKafkaProperties(app));
public <K, V> TestTopology<K, V> createTopology(final ConfiguredStreamsApp<? extends StreamsApp> app) {
return new TestTopology<>(app::createTopology, this.getKafkaProperties(app));
}

/**
* Create a {@code TestTopologyExtension} from a {@code ConfiguredStreamsApp}. It injects are
* {@link KafkaEndpointConfig} without configured Schema Registry.
* Create a {@code TestTopologyExtension} from a {@code ConfiguredStreamsApp}. It injects a
* {@link KafkaEndpointConfig} for test purposes with Schema Registry optionally configured.
*
* @param app ConfiguredStreamsApp to create TestTopology from
* @param <K> Default type of keys
Expand All @@ -107,46 +108,24 @@ public static <K, V> TestTopology<K, V> createTopology(final ConfiguredStreamsAp
* @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig)
* @see ConfiguredStreamsApp#createTopology(Map)
*/
public static <K, V> TestTopologyExtension<K, V> createTopologyExtension(
public <K, V> TestTopologyExtension<K, V> createTopologyExtension(
final ConfiguredStreamsApp<? extends StreamsApp> app) {
return new TestTopologyExtension<>(app::createTopology, getKafkaProperties(app));
return new TestTopologyExtension<>(app::createTopology, this.getKafkaProperties(app));
}

/**
* Get Kafka properties from a {@code ConfiguredStreamsApp} after using a {@link KafkaEndpointConfig} with
* configured Schema Registry.
* Get Kafka properties from a {@code ConfiguredStreamsApp} using a {@link KafkaEndpointConfig} for test purposes
* with Schema Registry optionally configured.
*
* @param app ConfiguredStreamsApp to get Kafka properties of
* @return Kafka properties
* @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig)
*/
public static Map<String, Object> getKafkaPropertiesWithSchemaRegistryUrl(
final ConfiguredStreamsApp<? extends StreamsApp> app) {
final KafkaEndpointConfig endpointConfig = newEndpointConfig()
.schemaRegistryUrl(SCHEMA_REGISTRY_URL)
public Map<String, Object> getKafkaProperties(final ConfiguredStreamsApp<? extends StreamsApp> app) {
final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder()
.bootstrapServers("localhost:9092")
.schemaRegistryUrl(this.schemaRegistryUrl)
.build();
return app.getKafkaProperties(endpointConfig);
}

/**
* Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and
* {@link org.apache.kafka.common.serialization.Serializer} using the {@code TestTopology} properties.
* @param testTopology {@code TestTopology} to use properties of
* @return {@code Configurator}
* @see TestTopology#getProperties()
*/
public static Configurator createConfigurator(final TestTopology<?, ?> testTopology) {
return new Configurator(testTopology.getProperties());
}

private static Map<String, Object> getKafkaProperties(final ConfiguredStreamsApp<? extends StreamsApp> app) {
final KafkaEndpointConfig endpointConfig = newEndpointConfig()
.build();
return app.getKafkaProperties(endpointConfig);
}

private static KafkaEndpointConfigBuilder newEndpointConfig() {
return KafkaEndpointConfig.builder()
.bootstrapServers("localhost:9092");
}
}

0 comments on commit c1bc5c5

Please sign in to comment.