Skip to content

Commit

Permalink
Add error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 27, 2025
1 parent f16dd10 commit 426cd92
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@RequiredArgsConstructor
public final class TestApplicationHelper {

private final @NonNull SchemaRegistryEnv schemaRegistryEnv;
private final @NonNull TestEnvironment environment;

public ConfiguredStreamsApp<? extends StreamsApp> createConfiguredApp(
final KafkaStreamsApplication<? extends StreamsApp> app) {
Expand All @@ -55,11 +55,11 @@ public <K, V> TestTopology<K, V> createTopologyExtension(final KafkaStreamsAppli
}

public void configure(final KafkaStreamsApplication<? extends StreamsApp> app) {
app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl());
app.setSchemaRegistryUrl(this.environment.getSchemaRegistryUrl());
}

private TestTopologyFactory createTestTopologyFactory() {
return new TestTopologyFactory(this.schemaRegistryEnv);
return new TestTopologyFactory(this.environment);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import picocli.CommandLine;

@Getter
@RequiredArgsConstructor
public final class TestApplicationRunner {

private final @NonNull String bootstrapServers;
private final @NonNull SchemaRegistryEnv schemaRegistryEnv;
private final @NonNull TestEnvironment environment;

public void run(final KafkaStreamsApplication<? extends StreamsApp> app, final String[] args) {
final String[] newArgs = this.setupArgs(args, emptyList());
Expand Down Expand Up @@ -79,7 +78,6 @@ public void reset(final KafkaStreamsApplication<? extends StreamsApp> app) {

public void prepareExecution(final KafkaStreamsApplication<? extends StreamsApp> app) {
this.configure(app);
new CommandLine(app); // initialize all mixins
app.onApplicationStart();
}

Expand All @@ -96,22 +94,22 @@ public ConsumerGroupVerifier verify(final KafkaStreamsApplication<? extends Stre
public KafkaTestClient newTestClient() {
return new KafkaTestClient(KafkaEndpointConfig.builder()
.bootstrapServers(this.bootstrapServers)
.schemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl())
.schemaRegistryUrl(this.environment.getSchemaRegistryUrl())
.build());
}

public void configure(final KafkaStreamsApplication<? extends StreamsApp> app) {
app.setBootstrapServers(this.bootstrapServers);
app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl());
app.setSchemaRegistryUrl(this.environment.getSchemaRegistryUrl());
}

private String[] setupArgs(final String[] args, final Iterable<String> command) {
final Builder<String> argBuilder = ImmutableList.<String>builder()
.add(args)
.add("--bootstrap-servers", this.bootstrapServers)
.addAll(command);
if (this.schemaRegistryEnv.getSchemaRegistryUrl() != null) {
argBuilder.add("--schema-registry-url", this.schemaRegistryEnv.getSchemaRegistryUrl());
if (this.environment.getSchemaRegistryUrl() != null) {
argBuilder.add("--schema-registry-url", this.environment.getSchemaRegistryUrl());
}
final List<String> newArgs = argBuilder.build();
return newArgs.toArray(new String[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

package com.bakdata.kafka.integration;

import static com.bakdata.kafka.SchemaRegistryEnv.withoutSchemaRegistry;
import static com.bakdata.kafka.TestEnvironment.withoutSchemaRegistry;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.kafka.KafkaStreamsApplication;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package com.bakdata.kafka.integration;


import static com.bakdata.kafka.SchemaRegistryEnv.withoutSchemaRegistry;
import static com.bakdata.kafka.TestEnvironment.withoutSchemaRegistry;

import com.bakdata.kafka.CloseFlagApp;
import com.bakdata.kafka.KafkaStreamsApplication;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

package com.bakdata.kafka;

import static com.bakdata.kafka.SchemaRegistryEnv.withSchemaRegistry;
import static com.bakdata.kafka.TestEnvironment.withSchemaRegistry;

import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension;
import com.bakdata.kafka.test_applications.MirrorWithNonDefaultSerde;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
@Testcontainers
public abstract class KafkaTest {
protected static final Duration POLL_TIMEOUT = Duration.ofSeconds(10);
private final SchemaRegistryEnv schemaRegistryEnv = SchemaRegistryEnv.withSchemaRegistry();
private final TestEnvironment environment = TestEnvironment.withSchemaRegistry();
@Container
private final KafkaContainer kafkaCluster = newCluster();

Expand Down Expand Up @@ -73,11 +73,11 @@ protected KafkaTestClient newTestClient() {
}

protected String getSchemaRegistryUrl() {
return this.schemaRegistryEnv.getSchemaRegistryUrl();
return this.environment.getSchemaRegistryUrl();
}

protected SchemaRegistryClient getSchemaRegistryClient() {
return this.schemaRegistryEnv.getSchemaRegistryClient();
return this.environment.getSchemaRegistryClient();
}

protected void awaitProcessing(final ExecutableStreamsApp<?> app) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
public final class SchemaRegistryEnv {
public final class TestEnvironment {

private static final String MOCK_URL_PREFIX = "mock://";
private final String schemaRegistryUrl;
Expand All @@ -50,7 +50,7 @@ public final class SchemaRegistryEnv {
* Create a new {@code SchemaRegistryEnv} with no configured Schema Registry.
* @return {@code SchemaRegistryEnv} with no configured Schema Registry
*/
public static SchemaRegistryEnv withoutSchemaRegistry() {
public static TestEnvironment withoutSchemaRegistry() {
return withSchemaRegistry(null);
}

Expand All @@ -60,7 +60,7 @@ public static SchemaRegistryEnv withoutSchemaRegistry() {
* collisions between different test instances as scopes are retained globally.
* @return {@code SchemaRegistryEnv} with configured Schema Registry
*/
public static SchemaRegistryEnv withSchemaRegistry() {
public static TestEnvironment withSchemaRegistry() {
return withSchemaRegistry(MOCK_URL_PREFIX + UUID.randomUUID());
}

Expand All @@ -69,8 +69,8 @@ public static SchemaRegistryEnv withSchemaRegistry() {
* @param schemaRegistryUrl Schema Registry URL to use
* @return {@code SchemaRegistryEnv} with configured Schema Registry
*/
public static SchemaRegistryEnv withSchemaRegistry(final String schemaRegistryUrl) {
return new SchemaRegistryEnv(schemaRegistryUrl);
public static TestEnvironment withSchemaRegistry(final String schemaRegistryUrl) {
return new TestEnvironment(schemaRegistryUrl);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public final class TestTopologyFactory {
StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Long.toString(0L),
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(10_000)
);
private final @NonNull SchemaRegistryEnv schemaRegistryEnv;
private final @NonNull TestEnvironment environment;

/**
* Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and
Expand Down Expand Up @@ -131,7 +131,7 @@ public <K, V> TestTopologyExtension<K, V> createTopologyExtension(
public Map<String, Object> getKafkaProperties(final ConfiguredStreamsApp<? extends StreamsApp> app) {
final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder()
.bootstrapServers("localhost:9092")
.schemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl())
.schemaRegistryUrl(this.environment.getSchemaRegistryUrl())
.build();
return app.getKafkaProperties(endpointConfig);
}
Expand Down

0 comments on commit 426cd92

Please sign in to comment.