Skip to content

Commit

Permalink
Use Confluent MockSchemaRegistry
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 9, 2025
1 parent e036f9d commit 8416e8d
Showing 1 changed file with 26 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,34 +476,33 @@ void shouldDeleteValueSchema()
void shouldDeleteKeySchema()
throws InterruptedException, IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorKeyApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint())) {
try (final SchemaRegistryClient client = getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
admin.getTopicClient()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new KeyValue<>(testRecord, "val")
));

run(executableApp);

// Wait until all stream application are completely stopped before triggering cleanup
Thread.sleep(TIMEOUT.toMillis());
final String outputTopic = app.getTopics().getOutputTopic();
this.softly.assertThat(client.getAllSubjects())
.contains(outputTopic + "-key", inputTopic + "-key");
clean(executableApp);
this.softly.assertThat(client.getAllSubjects())
.doesNotContain(outputTopic + "-key")
.contains(inputTopic + "-key");
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
admin.getTopicClient()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new KeyValue<>(testRecord, "val")
));

run(executableApp);

// Wait until all stream application are completely stopped before triggering cleanup
Thread.sleep(TIMEOUT.toMillis());
final String outputTopic = app.getTopics().getOutputTopic();
this.softly.assertThat(client.getAllSubjects())
.contains(outputTopic + "-key", inputTopic + "-key");
clean(executableApp);
this.softly.assertThat(client.getAllSubjects())
.doesNotContain(outputTopic + "-key")
.contains(inputTopic + "-key");
}
}

Expand Down

0 comments on commit 8416e8d

Please sign in to comment.