Skip to content

Commit

Permalink
Use Confluent MockSchemaRegistry
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 9, 2025
1 parent 70aaa1a commit e036f9d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,19 @@

package com.bakdata.kafka.integration;

import static java.util.Collections.emptyMap;

import com.bakdata.kafka.KafkaContainerHelper;
import com.bakdata.kafka.KafkaEndpointConfig;
import com.bakdata.kafka.TestTopologyFactory;
import com.bakdata.kafka.TestUtil;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory;
import java.util.List;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
abstract class KafkaTest {
static final String SCHEMA_REGISTRY_URL = "mock://";
@Container
private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster();

static SchemaRegistryClient getSchemaRegistryClient() {
return SchemaRegistryClientFactory.newClient(List.of(SCHEMA_REGISTRY_URL), 0, null, emptyMap(), null);
}

KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
return KafkaEndpointConfig.builder()
.bootstrapServers(this.kafkaCluster.getBootstrapServers())
Expand All @@ -55,7 +46,7 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
KafkaEndpointConfig createEndpoint() {
return KafkaEndpointConfig.builder()
.bootstrapServers(this.kafkaCluster.getBootstrapServers())
.schemaRegistryUrl(SCHEMA_REGISTRY_URL)
.schemaRegistryUrl(TestTopologyFactory.SCHEMA_REGISTRY_URL)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package com.bakdata.kafka.integration;


import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient;
import static com.bakdata.kafka.integration.ProducerRunnerTest.configureApp;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@


import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS;
import static com.bakdata.kafka.TestTopologyFactory.SCHEMA_REGISTRY_URL;
import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient;
import static com.bakdata.kafka.integration.StreamsRunnerTest.configureApp;
import static java.util.Collections.emptyMap;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -474,33 +476,34 @@ void shouldDeleteValueSchema()
void shouldDeleteKeySchema()
throws InterruptedException, IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorKeyApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
admin.getTopicClient()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint())) {
try (final SchemaRegistryClient client = getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
admin.getTopicClient()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new KeyValue<>(testRecord, "val")
));

run(executableApp);

// Wait until all stream application are completely stopped before triggering cleanup
Thread.sleep(TIMEOUT.toMillis());
final String outputTopic = app.getTopics().getOutputTopic();
this.softly.assertThat(client.getAllSubjects())
.contains(outputTopic + "-key", inputTopic + "-key");
clean(executableApp);
this.softly.assertThat(client.getAllSubjects())
.doesNotContain(outputTopic + "-key")
.contains(inputTopic + "-key");
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new KeyValue<>(testRecord, "val")
));

run(executableApp);

// Wait until all stream application are completely stopped before triggering cleanup
Thread.sleep(TIMEOUT.toMillis());
final String outputTopic = app.getTopics().getOutputTopic();
this.softly.assertThat(client.getAllSubjects())
.contains(outputTopic + "-key", inputTopic + "-key");
clean(executableApp);
this.softly.assertThat(client.getAllSubjects())
.doesNotContain(outputTopic + "-key")
.contains(inputTopic + "-key");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@


import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS;
import static com.bakdata.kafka.TestTopologyFactory.SCHEMA_REGISTRY_URL;
import static com.bakdata.kafka.TestTopologyFactory.getSchemaRegistryClient;
import static com.bakdata.kafka.TestUtil.newKafkaCluster;
import static java.util.Collections.emptyMap;

import com.bakdata.kafka.KafkaContainerHelper;
import com.bakdata.kafka.TestRecord;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
Expand All @@ -57,7 +58,6 @@
@Slf4j
@ExtendWith(SoftAssertionsExtension.class)
class SchemaTopicClientTest {
private static final String SCHEMA_REGISTRY_URL = "mock://";
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String TOPIC = "topic";
@Container
Expand All @@ -66,10 +66,6 @@ class SchemaTopicClientTest {
@InjectSoftAssertions
SoftAssertions softly;

private static SchemaRegistryClient getSchemaRegistryClient() {
return SchemaRegistryClientFactory.newClient(List.of(SCHEMA_REGISTRY_URL), 0, null, emptyMap(), null);
}

@Test
void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet()
throws InterruptedException, IOException, RestClientException {
Expand Down

0 comments on commit e036f9d

Please sign in to comment.