Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/v3' into feature/remove-schema-r…
Browse files Browse the repository at this point in the history
…egistry

# Conflicts:
#	streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ImprovedAdminClient.java
#	streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/SchemaTopicClient.java
  • Loading branch information
philipp94831 committed Jul 29, 2024
2 parents caaf0a0 + d1bfa89 commit 363e9c8
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* Utility class that provides helpers for cleaning {@code LargeMessageSerde} artifacts
*/
@UtilityClass
public class LargeMessageKafkaApplicationUtils {
public class LargeMessageAppUtils {
/**
* Create a hook that cleans up LargeMessage files associated with a topic. It is expected that all necessary
* properties to create a {@link AbstractLargeMessageConfig} are part of {@code kafkaProperties}.
Expand All @@ -41,9 +41,10 @@ public class LargeMessageKafkaApplicationUtils {
* @return hook that cleans up LargeMessage files associated with a topic
* @see HasTopicHooks#registerTopicHook(TopicHook)
*/
public static TopicHook createLargeMessageCleanUpHook(final Map<String, Object> kafkaProperties) {
public static TopicHook createTopicHook(final Map<String, Object> kafkaProperties) {
final AbstractLargeMessageConfig largeMessageConfig = new AbstractLargeMessageConfig(kafkaProperties);
final LargeMessageStoringClient storer = largeMessageConfig.getStorer();
//TODO: close storer once it implements AutoCloseable
return new TopicHook() {
@Override
public void deleted(final String topic) {
Expand All @@ -59,10 +60,22 @@ public void deleted(final String topic) {
*
* @param configuration Configuration to create hook from
* @return hook that cleans up LargeMessage files associated with a topic
* @see #createLargeMessageCleanUpHook(Map)
* @see #createTopicHook(Map)
*/
public static TopicHook createLargeMessageCleanUpHook(final EffectiveAppConfiguration<?> configuration) {
return createLargeMessageCleanUpHook(configuration.getKafkaProperties());
public static TopicHook createTopicHook(final EffectiveAppConfiguration<?> configuration) {
return createTopicHook(configuration.getKafkaProperties());
}

/**
* Register a hook that cleans up LargeMessage files associated with a topic
* @param cleanUpConfiguration Configuration to register hook on
* @param configuration Configuration to create hook from
* @param <T> type of configuration
* @return Configuration with registered topic hook
* @see LargeMessageAppUtils#createTopicHook(EffectiveAppConfiguration)
*/
public static <T> T registerTopicHook(
final HasTopicHooks<T> cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(createTopicHook(configuration));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,11 @@
*/
public interface LargeMessageProducerApp extends ProducerApp {

/**
* Register a hook that cleans up LargeMessage files associated with a topic
* @param cleanUpConfiguration Configuration to register hook on
* @param configuration Configuration to create hook from
* @return {@code ProducerCleanUpConfiguration} with registered topic hook
* @see LargeMessageKafkaApplicationUtils#createLargeMessageCleanUpHook(EffectiveAppConfiguration)
*/
static ProducerCleanUpConfiguration registerLargeMessageCleanUpHook(
final ProducerCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(
LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration));
}

@Override
default ProducerCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<ProducerTopicConfig> configuration) {
final ProducerCleanUpConfiguration cleanUpConfiguration = ProducerApp.super.setupCleanUp(configuration);
return registerLargeMessageCleanUpHook(cleanUpConfiguration, configuration);
return LargeMessageAppUtils.registerTopicHook(cleanUpConfiguration, configuration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,11 @@
*/
public interface LargeMessageStreamsApp extends StreamsApp {

/**
* Register a hook that cleans up LargeMessage files associated with a topic
* @param cleanUpConfiguration Configuration to register hook on
* @param configuration Configuration to create hook from
* @return {@code StreamsCleanUpConfiguration} with registered topic hook
* @see LargeMessageKafkaApplicationUtils#createLargeMessageCleanUpHook(EffectiveAppConfiguration)
*/
static StreamsCleanUpConfiguration registerLargeMessageCleanUpHook(
final StreamsCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(
LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration));
}

@Override
default StreamsCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<StreamsTopicConfig> configuration) {
final StreamsCleanUpConfiguration cleanUpConfiguration = StreamsApp.super.setupCleanUp(configuration);
return registerLargeMessageCleanUpHook(cleanUpConfiguration, configuration);
return LargeMessageAppUtils.registerTopicHook(cleanUpConfiguration, configuration);
}

}

0 comments on commit 363e9c8

Please sign in to comment.