Skip to content

Commit

Permalink
Fix Sonarqube issues
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Aug 7, 2024
1 parent 8b34869 commit 025ad1d
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ private interface Stoppable {

/**
* Provides access to a {@link CleanUpRunner} and closes the associated {@link ExecutableApp}
*
* @param <CR> type of {@link CleanUpRunner} used by this app
*/
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
public static class CleanableApp<CR extends CleanUpRunner> implements AutoCloseable, Stoppable {
Expand All @@ -349,6 +351,8 @@ public void stop() {

/**
* Provides access to a {@link Runner} and closes the associated {@link ExecutableApp}
*
* @param <R> type of {@link Runner} used by this app
*/
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
public static final class RunnableApp<R extends Runner> implements AutoCloseable, Stoppable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void deleteConsumerGroup(final String groupName) {
this.adminClient.deleteConsumerGroups(List.of(groupName))
.all()
.get(this.timeout.toSeconds(), TimeUnit.SECONDS);
log.info("Deleted consumer group'{}'", groupName);
log.info("Deleted consumer group '{}'", groupName);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw new KafkaAdminException("Failed to delete consumer group " + groupName, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ void shouldThrowIfKeySerializerHasBeenConfiguredDifferently() {
));
final ConfiguredProducerApp<ProducerApp> configuredApp =
new ConfiguredProducerApp<>(new TestProducer(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder()
.bootstrapServers("fake")
.build()))
.build();
assertThatThrownBy(() -> configuredApp.getKafkaProperties(endpointConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'key.serializer' should not be configured already");
}
Expand All @@ -112,9 +113,10 @@ void shouldThrowIfValueSerializerHasBeenConfiguredDifferently() {
));
final ConfiguredProducerApp<ProducerApp> configuredApp =
new ConfiguredProducerApp<>(new TestProducer(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder()
.bootstrapServers("fake")
.build()))
.build();
assertThatThrownBy(() -> configuredApp.getKafkaProperties(endpointConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'value.serializer' should not be configured already");
}
Expand All @@ -126,9 +128,10 @@ void shouldThrowIfBootstrapServersHasBeenConfiguredDifferently() {
));
final ConfiguredProducerApp<ProducerApp> configuredApp =
new ConfiguredProducerApp<>(new TestProducer(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder()
.bootstrapServers("fake")
.build()))
.build();
assertThatThrownBy(() -> configuredApp.getKafkaProperties(endpointConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'bootstrap.servers' should not be configured already");
}
Expand All @@ -140,10 +143,11 @@ void shouldThrowIfSchemaRegistryHasBeenConfiguredDifferently() {
));
final ConfiguredProducerApp<ProducerApp> configuredApp =
new ConfiguredProducerApp<>(new TestProducer(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder()
.bootstrapServers("fake")
.schemaRegistryUrl("fake")
.build()))
.build();
assertThatThrownBy(() -> configuredApp.getKafkaProperties(endpointConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'schema.registry.url' should not be configured already");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ void shouldThrowIfKeySerdeHasBeenConfiguredDifferently() {
));
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(new TestApplication(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder()
.bootstrapServers("fake")
.build()))
.build();
assertThatThrownBy(() -> configuredApp.getKafkaProperties(endpointConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'default.key.serde' should not be configured already");
}
Expand All @@ -112,9 +113,10 @@ void shouldThrowIfValueSerdeHasBeenConfiguredDifferently() {
));
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(new TestApplication(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder()
.bootstrapServers("fake")
.build()))
.build();
assertThatThrownBy(() -> configuredApp.getKafkaProperties(endpointConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'default.value.serde' should not be configured already");
}
Expand All @@ -126,9 +128,10 @@ void shouldThrowIfAppIdHasBeenConfiguredDifferently() {
));
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(new TestApplication(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder()
.bootstrapServers("fake")
.build()))
.build();
assertThatThrownBy(() -> configuredApp.getKafkaProperties(endpointConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'application.id' should not be configured already");
}
Expand All @@ -140,9 +143,10 @@ void shouldThrowIfBootstrapServersHasBeenConfiguredDifferently() {
));
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(new TestApplication(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder()
.bootstrapServers("fake")
.build()))
.build();
assertThatThrownBy(() -> configuredApp.getKafkaProperties(endpointConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'bootstrap.servers' should not be configured already");
}
Expand All @@ -154,10 +158,11 @@ void shouldThrowIfSchemaRegistryHasBeenConfiguredDifferently() {
));
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(new TestApplication(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder()
.bootstrapServers("fake")
.schemaRegistryUrl("fake")
.build()))
.build();
assertThatThrownBy(() -> configuredApp.getKafkaProperties(endpointConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'schema.registry.url' should not be configured already");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import com.bakdata.kafka.HasTopicHooks.TopicHook;
import java.util.Map;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.UtilityClass;

/**
Expand All @@ -44,13 +46,7 @@ public class LargeMessageAppUtils {
public static TopicHook createTopicHook(final Map<String, Object> kafkaProperties) {
final AbstractLargeMessageConfig largeMessageConfig = new AbstractLargeMessageConfig(kafkaProperties);
final LargeMessageStoringClient storer = largeMessageConfig.getStorer();
//TODO: close storer once it implements AutoCloseable
return new TopicHook() {
@Override
public void deleted(final String topic) {
storer.deleteAllFiles(topic);
}
};
return new LargeMessageTopicHook(storer);
}

/**
Expand Down Expand Up @@ -78,4 +74,15 @@ public static <T> T registerTopicHook(
final HasTopicHooks<T> cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(createTopicHook(configuration));
}

@RequiredArgsConstructor
private static class LargeMessageTopicHook implements TopicHook {
//TODO: close storer once it implements AutoCloseable
private final @NonNull LargeMessageStoringClient storer;

@Override
public void deleted(final String topic) {
this.storer.deleteAllFiles(topic);
}
}
}

0 comments on commit 025ad1d

Please sign in to comment.