Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jul 29, 2024
1 parent aaf6a68 commit caaf0a0
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ public void run() {
"--kafka-config", "foo=1,bar=2",
});
assertThat(app.getBootstrapServers()).isEqualTo("bootstrap-servers");
assertThat(app.getSchemaRegistryUrl()).isEqualTo("schema-registry");
assertThat(app.getInputTopics()).containsExactly("input1", "input2");
assertThat(app.getLabeledInputTopics())
.hasSize(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
*/
@UtilityClass
@Slf4j
public class SchemaRegistryKafkaApplicationUtils {
public class SchemaRegistryAppUtils {
private static final int CACHE_CAPACITY = 100;

/**
Expand Down Expand Up @@ -89,7 +89,7 @@ public static SchemaRegistryClient createSchemaRegistryClient(@NonNull final Map
* @return hook that cleans up schemas associated with a topic
* @see HasTopicHooks#registerTopicHook(TopicHook)
*/
public static TopicHook createSchemaRegistryCleanUpHook(final Map<String, Object> kafkaProperties) {
public static TopicHook createTopicHook(final Map<String, Object> kafkaProperties) {
final SchemaRegistryClient schemaRegistryClient = createSchemaRegistryClient(kafkaProperties);
return new SchemaRegistryTopicHook(schemaRegistryClient);
}
Expand All @@ -101,10 +101,23 @@ public static TopicHook createSchemaRegistryCleanUpHook(final Map<String, Object
*
* @param configuration Configuration to create hook from
* @return hook that cleans up schemas associated with a topic
* @see #createSchemaRegistryCleanUpHook(Map)
* @see #createTopicHook(Map)
*/
public static TopicHook createSchemaRegistryCleanUpHook(final EffectiveAppConfiguration<?> configuration) {
return createSchemaRegistryCleanUpHook(configuration.getKafkaProperties());
public static TopicHook createTopicHook(final EffectiveAppConfiguration<?> configuration) {
return createTopicHook(configuration.getKafkaProperties());
}

/**
* Register a hook that cleans up schemas associated with a topic
* @param cleanUpConfiguration Configuration to register hook on
* @param configuration Configuration to create hook from
* @return {@code StreamsCleanUpConfiguration} with registered topic hook
* @see SchemaRegistryAppUtils#createTopicHook(EffectiveAppConfiguration)
*/
public static <T> T registerTopicHook(
final HasTopicHooks<T> cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(
createTopicHook(configuration));
}

@RequiredArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,11 @@
*/
public interface SchemaRegistryProducerApp extends ProducerApp {

/**
* Register a hook that cleans up schemas associated with a topic
* @param cleanUpConfiguration Configuration to register hook on
* @param configuration Configuration to create hook from
* @return {@code ProducerCleanUpConfiguration} with registered topic hook
* @see SchemaRegistryKafkaApplicationUtils#createSchemaRegistryCleanUpHook(EffectiveAppConfiguration)
*/
static ProducerCleanUpConfiguration registerSchemaRegistryCleanUpHook(
final ProducerCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(
SchemaRegistryKafkaApplicationUtils.createSchemaRegistryCleanUpHook(configuration));
}

@Override
default ProducerCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<ProducerTopicConfig> configuration) {
final ProducerCleanUpConfiguration cleanUpConfiguration = ProducerApp.super.setupCleanUp(configuration);
return registerSchemaRegistryCleanUpHook(cleanUpConfiguration, configuration);
return SchemaRegistryAppUtils.registerTopicHook(cleanUpConfiguration, configuration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,11 @@
*/
public interface SchemaRegistryStreamsApp extends StreamsApp {

/**
* Register a hook that cleans up schemas associated with a topic
* @param cleanUpConfiguration Configuration to register hook on
* @param configuration Configuration to create hook from
* @return {@code StreamsCleanUpConfiguration} with registered topic hook
* @see SchemaRegistryKafkaApplicationUtils#createSchemaRegistryCleanUpHook(EffectiveAppConfiguration)
*/
static StreamsCleanUpConfiguration registerSchemaRegistryCleanUpHook(
final StreamsCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(
SchemaRegistryKafkaApplicationUtils.createSchemaRegistryCleanUpHook(configuration));
}

@Override
default StreamsCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<StreamsTopicConfig> configuration) {
final StreamsCleanUpConfiguration cleanUpConfiguration = StreamsApp.super.setupCleanUp(configuration);
return registerSchemaRegistryCleanUpHook(cleanUpConfiguration, configuration);
return SchemaRegistryAppUtils.registerTopicHook(cleanUpConfiguration, configuration);
}

}

0 comments on commit caaf0a0

Please sign in to comment.