Skip to content

Commit

Permalink
Make CleanUpRunner closeable (#247)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Jul 29, 2024
1 parent a88a343 commit d1bfa89
Show file tree
Hide file tree
Showing 14 changed files with 61 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ public void close() {
*/
@Override
public void stop() {
this.cleanUpRunner.close();
this.app.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
/**
* Cleans all resources associated with an application
*/
@FunctionalInterface
public interface CleanUpRunner {
public interface CleanUpRunner extends AutoCloseable {

@Override
void close();

/**
* Clean all resources associated with an application
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,18 @@ public interface HasTopicHooks<SELF> {
/**
* Hook for performing actions on topics
*/
interface TopicHook {
interface TopicHook extends AutoCloseable {
/**
* Called when a topic is deleted
* @param topic name of the topic
*/
default void deleted(final String topic) {
// do nothing
}

@Override
default void close() {
// do nothing
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
* Provides configuration options for {@link ProducerCleanUpRunner}
*/
public class ProducerCleanUpConfiguration
implements HasTopicHooks<ProducerCleanUpConfiguration>, HasCleanHook<ProducerCleanUpConfiguration> {
implements HasTopicHooks<ProducerCleanUpConfiguration>, HasCleanHook<ProducerCleanUpConfiguration>,
AutoCloseable {
private final @NonNull Collection<TopicHook> topicHooks = new ArrayList<>();
private final @NonNull Collection<Runnable> cleanHooks = new ArrayList<>();

Expand All @@ -54,6 +55,11 @@ public ProducerCleanUpConfiguration registerCleanHook(final Runnable hook) {
return this;
}

@Override
public void close() {
this.topicHooks.forEach(TopicHook::close);
}

void runCleanHooks() {
this.cleanHooks.forEach(Runnable::run);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig to
return new ProducerCleanUpRunner(topics, kafkaProperties, configuration);
}

@Override
public void close() {
this.cleanHooks.close();
}

/**
* Delete all output topics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
* Provides configuration options for {@link StreamsCleanUpRunner}
*/
public class StreamsCleanUpConfiguration
implements HasTopicHooks<StreamsCleanUpConfiguration>, HasCleanHook<StreamsCleanUpConfiguration> {
implements HasTopicHooks<StreamsCleanUpConfiguration>, HasCleanHook<StreamsCleanUpConfiguration>,
AutoCloseable {
private final @NonNull Collection<TopicHook> topicHooks = new ArrayList<>();
private final @NonNull Collection<Runnable> cleanHooks = new ArrayList<>();
private final @NonNull Collection<Runnable> resetHooks = new ArrayList<>();
Expand Down Expand Up @@ -65,6 +66,11 @@ public StreamsCleanUpConfiguration registerResetHook(final Runnable hook) {
return this;
}

@Override
public void close() {
this.topicHooks.forEach(TopicHook::close);
}

void runCleanHooks() {
this.cleanHooks.forEach(Runnable::run);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ private static Collection<String> filterExistingTopics(final Collection<String>
.collect(Collectors.toList());
}

@Override
public void close() {
this.cleanHooks.close();
}

/**
* Clean up your Streams app by resetting the app and deleting the output topics
* and consumer group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka.util;

import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
Expand All @@ -44,7 +43,7 @@
*/
@RequiredArgsConstructor
@Slf4j
public final class ConsumerGroupClient implements Closeable {
public final class ConsumerGroupClient implements AutoCloseable {

private final @NonNull Admin adminClient;
private final @NonNull Duration timeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
Expand All @@ -47,7 +46,7 @@
* Provide methods for common operations when performing administrative actions on a Kafka cluster
*/
@Builder(access = AccessLevel.PRIVATE)
public final class ImprovedAdminClient implements Closeable {
public final class ImprovedAdminClient implements AutoCloseable {

private static final Duration ADMIN_TIMEOUT = Duration.ofSeconds(10L);
private final @NonNull Admin adminClient;
Expand Down Expand Up @@ -138,7 +137,7 @@ public void close() {

@RequiredArgsConstructor
private static class PooledSchemaRegistryClient implements SchemaRegistryClient {
@Delegate(excludes = Closeable.class)
@Delegate(excludes = AutoCloseable.class)
private final @NonNull SchemaRegistryClient schemaRegistryClient;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
Expand All @@ -45,7 +44,7 @@
*/
@Slf4j
@RequiredArgsConstructor
public final class SchemaTopicClient implements Closeable {
public final class SchemaTopicClient implements AutoCloseable {
private static final int CACHE_CAPACITY = 100;
private final @NonNull TopicClient topicClient;
private final SchemaRegistryClient schemaRegistryClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka.util;

import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
Expand All @@ -48,7 +47,7 @@
*/
@RequiredArgsConstructor
@Slf4j
public final class TopicClient implements Closeable {
public final class TopicClient implements AutoCloseable {

private final @NonNull Admin adminClient;
private final @NonNull Duration timeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* Utility class that provides helpers for cleaning {@code LargeMessageSerde} artifacts
*/
@UtilityClass
public class LargeMessageKafkaApplicationUtils {
public class LargeMessageAppUtils {
/**
* Create a hook that cleans up LargeMessage files associated with a topic. It is expected that all necessary
* properties to create a {@link AbstractLargeMessageConfig} are part of {@code kafkaProperties}.
Expand All @@ -41,9 +41,10 @@ public class LargeMessageKafkaApplicationUtils {
* @return hook that cleans up LargeMessage files associated with a topic
* @see HasTopicHooks#registerTopicHook(TopicHook)
*/
public static TopicHook createLargeMessageCleanUpHook(final Map<String, Object> kafkaProperties) {
public static TopicHook createTopicHook(final Map<String, Object> kafkaProperties) {
final AbstractLargeMessageConfig largeMessageConfig = new AbstractLargeMessageConfig(kafkaProperties);
final LargeMessageStoringClient storer = largeMessageConfig.getStorer();
//TODO: close storer once it implements AutoCloseable
return new TopicHook() {
@Override
public void deleted(final String topic) {
Expand All @@ -59,10 +60,22 @@ public void deleted(final String topic) {
*
* @param configuration Configuration to create hook from
* @return hook that cleans up LargeMessage files associated with a topic
* @see #createLargeMessageCleanUpHook(Map)
* @see #createTopicHook(Map)
*/
public static TopicHook createLargeMessageCleanUpHook(final EffectiveAppConfiguration<?> configuration) {
return createLargeMessageCleanUpHook(configuration.getKafkaProperties());
public static TopicHook createTopicHook(final EffectiveAppConfiguration<?> configuration) {
return createTopicHook(configuration.getKafkaProperties());
}

/**
* Register a hook that cleans up LargeMessage files associated with a topic
* @param cleanUpConfiguration Configuration to register hook on
* @param configuration Configuration to create hook from
* @param <T> type of configuration
* @return Configuration with registered topic hook
* @see LargeMessageAppUtils#createTopicHook(EffectiveAppConfiguration)
*/
public static <T> T registerTopicHook(
final HasTopicHooks<T> cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(createTopicHook(configuration));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,11 @@
*/
public interface LargeMessageProducerApp extends ProducerApp {

/**
* Register a hook that cleans up LargeMessage files associated with a topic
* @param cleanUpConfiguration Configuration to register hook on
* @param configuration Configuration to create hook from
* @return {@code ProducerCleanUpConfiguration} with registered topic hook
* @see LargeMessageKafkaApplicationUtils#createLargeMessageCleanUpHook(EffectiveAppConfiguration)
*/
static ProducerCleanUpConfiguration registerLargeMessageCleanUpHook(
final ProducerCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(
LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration));
}

@Override
default ProducerCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<ProducerTopicConfig> configuration) {
final ProducerCleanUpConfiguration cleanUpConfiguration = ProducerApp.super.setupCleanUp(configuration);
return registerLargeMessageCleanUpHook(cleanUpConfiguration, configuration);
return LargeMessageAppUtils.registerTopicHook(cleanUpConfiguration, configuration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,11 @@
*/
public interface LargeMessageStreamsApp extends StreamsApp {

/**
* Register a hook that cleans up LargeMessage files associated with a topic
* @param cleanUpConfiguration Configuration to register hook on
* @param configuration Configuration to create hook from
* @return {@code StreamsCleanUpConfiguration} with registered topic hook
* @see LargeMessageKafkaApplicationUtils#createLargeMessageCleanUpHook(EffectiveAppConfiguration)
*/
static StreamsCleanUpConfiguration registerLargeMessageCleanUpHook(
final StreamsCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(
LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration));
}

@Override
default StreamsCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<StreamsTopicConfig> configuration) {
final StreamsCleanUpConfiguration cleanUpConfiguration = StreamsApp.super.setupCleanUp(configuration);
return registerLargeMessageCleanUpHook(cleanUpConfiguration, configuration);
return LargeMessageAppUtils.registerTopicHook(cleanUpConfiguration, configuration);
}

}

0 comments on commit d1bfa89

Please sign in to comment.