From 426cd9201b41fba38c65699aa8dc84861db401b4 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 27 Jan 2025 13:04:38 +0100 Subject: [PATCH] Add error handling --- .../com/bakdata/kafka/TestApplicationHelper.java | 6 +++--- .../com/bakdata/kafka/TestApplicationRunner.java | 12 +++++------- .../bakdata/kafka/integration/RunStreamsAppTest.java | 2 +- .../kafka/integration/StreamsCleanUpTest.java | 2 +- .../test/java/com/bakdata/kafka/AvroMirrorTest.java | 2 +- .../java/com/bakdata/kafka/KafkaTest.java | 6 +++--- .../{SchemaRegistryEnv.java => TestEnvironment.java} | 10 +++++----- .../java/com/bakdata/kafka/TestTopologyFactory.java | 4 ++-- 8 files changed, 21 insertions(+), 23 deletions(-) rename streams-bootstrap-test/src/main/java/com/bakdata/kafka/{SchemaRegistryEnv.java => TestEnvironment.java} (92%) diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java index 2091b80c..53487bcf 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java @@ -33,7 +33,7 @@ @RequiredArgsConstructor public final class TestApplicationHelper { - private final @NonNull SchemaRegistryEnv schemaRegistryEnv; + private final @NonNull TestEnvironment environment; public ConfiguredStreamsApp createConfiguredApp( final KafkaStreamsApplication app) { @@ -55,11 +55,11 @@ public TestTopology createTopologyExtension(final KafkaStreamsAppli } public void configure(final KafkaStreamsApplication app) { - app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()); + app.setSchemaRegistryUrl(this.environment.getSchemaRegistryUrl()); } private TestTopologyFactory createTestTopologyFactory() { - return new TestTopologyFactory(this.schemaRegistryEnv); + return new TestTopologyFactory(this.environment); } } diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java index cd041def..f5f69f81 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -33,14 +33,13 @@ import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; -import picocli.CommandLine; @Getter @RequiredArgsConstructor public final class TestApplicationRunner { private final @NonNull String bootstrapServers; - private final @NonNull SchemaRegistryEnv schemaRegistryEnv; + private final @NonNull TestEnvironment environment; public void run(final KafkaStreamsApplication app, final String[] args) { final String[] newArgs = this.setupArgs(args, emptyList()); @@ -79,7 +78,6 @@ public void reset(final KafkaStreamsApplication app) { public void prepareExecution(final KafkaStreamsApplication app) { this.configure(app); - new CommandLine(app); // initialize all mixins app.onApplicationStart(); } @@ -96,13 +94,13 @@ public ConsumerGroupVerifier verify(final KafkaStreamsApplication app) { app.setBootstrapServers(this.bootstrapServers); - app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()); + app.setSchemaRegistryUrl(this.environment.getSchemaRegistryUrl()); } private String[] setupArgs(final String[] args, final Iterable command) { @@ -110,8 +108,8 @@ private String[] setupArgs(final String[] args, final Iterable command) .add(args) .add("--bootstrap-servers", this.bootstrapServers) .addAll(command); - if (this.schemaRegistryEnv.getSchemaRegistryUrl() != null) { - argBuilder.add("--schema-registry-url", this.schemaRegistryEnv.getSchemaRegistryUrl()); + if (this.environment.getSchemaRegistryUrl() != null) { + argBuilder.add("--schema-registry-url", this.environment.getSchemaRegistryUrl()); } final List newArgs = argBuilder.build(); return newArgs.toArray(new String[0]); diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index 85188a20..221f4355 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -24,7 +24,7 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.SchemaRegistryEnv.withoutSchemaRegistry; +import static com.bakdata.kafka.TestEnvironment.withoutSchemaRegistry; import static org.assertj.core.api.Assertions.assertThat; import com.bakdata.kafka.KafkaStreamsApplication; diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 3eeae5b7..c8af8d5e 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -25,7 +25,7 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.SchemaRegistryEnv.withoutSchemaRegistry; +import static com.bakdata.kafka.TestEnvironment.withoutSchemaRegistry; import com.bakdata.kafka.CloseFlagApp; import com.bakdata.kafka.KafkaStreamsApplication; diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java index 3ddad2f2..471a3b31 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java @@ -24,7 +24,7 @@ package com.bakdata.kafka; -import static com.bakdata.kafka.SchemaRegistryEnv.withSchemaRegistry; +import static com.bakdata.kafka.TestEnvironment.withSchemaRegistry; import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; import com.bakdata.kafka.test_applications.MirrorWithNonDefaultSerde; diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index 3e8b094e..3993d57e 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -36,7 +36,7 @@ @Testcontainers public abstract class KafkaTest { protected static final Duration POLL_TIMEOUT = Duration.ofSeconds(10); - private final SchemaRegistryEnv schemaRegistryEnv = SchemaRegistryEnv.withSchemaRegistry(); + private final TestEnvironment environment = TestEnvironment.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = newCluster(); @@ -73,11 +73,11 @@ protected KafkaTestClient newTestClient() { } protected String getSchemaRegistryUrl() { - return this.schemaRegistryEnv.getSchemaRegistryUrl(); + return this.environment.getSchemaRegistryUrl(); } protected SchemaRegistryClient getSchemaRegistryClient() { - return this.schemaRegistryEnv.getSchemaRegistryClient(); + return this.environment.getSchemaRegistryClient(); } protected void awaitProcessing(final ExecutableStreamsApp app) { diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SchemaRegistryEnv.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestEnvironment.java similarity index 92% rename from streams-bootstrap-test/src/main/java/com/bakdata/kafka/SchemaRegistryEnv.java rename to streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestEnvironment.java index 53438d25..e10110de 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SchemaRegistryEnv.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestEnvironment.java @@ -41,7 +41,7 @@ */ @RequiredArgsConstructor(access = AccessLevel.PRIVATE) @Getter -public final class SchemaRegistryEnv { +public final class TestEnvironment { private static final String MOCK_URL_PREFIX = "mock://"; private final String schemaRegistryUrl; @@ -50,7 +50,7 @@ public final class SchemaRegistryEnv { * Create a new {@code SchemaRegistryEnv} with no configured Schema Registry. * @return {@code SchemaRegistryEnv} with no configured Schema Registry */ - public static SchemaRegistryEnv withoutSchemaRegistry() { + public static TestEnvironment withoutSchemaRegistry() { return withSchemaRegistry(null); } @@ -60,7 +60,7 @@ public static SchemaRegistryEnv withoutSchemaRegistry() { * collisions between different test instances as scopes are retained globally. * @return {@code SchemaRegistryEnv} with configured Schema Registry */ - public static SchemaRegistryEnv withSchemaRegistry() { + public static TestEnvironment withSchemaRegistry() { return withSchemaRegistry(MOCK_URL_PREFIX + UUID.randomUUID()); } @@ -69,8 +69,8 @@ public static SchemaRegistryEnv withSchemaRegistry() { * @param schemaRegistryUrl Schema Registry URL to use * @return {@code SchemaRegistryEnv} with configured Schema Registry */ - public static SchemaRegistryEnv withSchemaRegistry(final String schemaRegistryUrl) { - return new SchemaRegistryEnv(schemaRegistryUrl); + public static TestEnvironment withSchemaRegistry(final String schemaRegistryUrl) { + return new TestEnvironment(schemaRegistryUrl); } /** diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java index 9e4b8339..276b075b 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java @@ -47,7 +47,7 @@ public final class TestTopologyFactory { StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Long.toString(0L), ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(10_000) ); - private final @NonNull SchemaRegistryEnv schemaRegistryEnv; + private final @NonNull TestEnvironment environment; /** * Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and @@ -131,7 +131,7 @@ public TestTopologyExtension createTopologyExtension( public Map getKafkaProperties(final ConfiguredStreamsApp app) { final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() .bootstrapServers("localhost:9092") - .schemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()) + .schemaRegistryUrl(this.environment.getSchemaRegistryUrl()) .build(); return app.getKafkaProperties(endpointConfig); }