From 8416e8d85abe507fbd31083acccd2a6d4a2ff6f6 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 9 Jan 2025 07:48:18 +0100 Subject: [PATCH] Use Confluent MockSchemaRegistry --- .../integration/StreamsCleanUpRunnerTest.java | 53 +++++++++---------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 0f5864d2..8a43a7df 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -476,34 +476,33 @@ void shouldDeleteValueSchema() void shouldDeleteKeySchema() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { - try (final SchemaRegistryClient client = getSchemaRegistryClient()) { - final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); - final String inputTopic = app.getTopics().getInputTopics().get(0); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) - .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) - .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(testRecord, "val") - )); - - run(executableApp); - - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); - final String outputTopic = app.getTopics().getOutputTopic(); - this.softly.assertThat(client.getAllSubjects()) - .contains(outputTopic + "-key", inputTopic + "-key"); - clean(executableApp); - this.softly.assertThat(client.getAllSubjects()) - .doesNotContain(outputTopic + "-key") - .contains(inputTopic + "-key"); + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); + final SchemaRegistryClient client = getSchemaRegistryClient()) { + final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); + final String inputTopic = app.getTopics().getInputTopics().get(0); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } + kafkaContainerHelper.send() + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>(testRecord, "val") + )); + + run(executableApp); + + // Wait until all stream application are completely stopped before triggering cleanup + Thread.sleep(TIMEOUT.toMillis()); + final String outputTopic = app.getTopics().getOutputTopic(); + this.softly.assertThat(client.getAllSubjects()) + .contains(outputTopic + "-key", inputTopic + "-key"); + clean(executableApp); + this.softly.assertThat(client.getAllSubjects()) + .doesNotContain(outputTopic + "-key") + .contains(inputTopic + "-key"); } }