From b20f76061949250748a558cd115d993fea25d27b Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 7 Jan 2025 11:21:21 +0100 Subject: [PATCH] Upgrade to Kafka 3.7 (#266) --- gradle.properties | 6 +++--- .../src/test/java/com/bakdata/kafka/CliTest.java | 6 +++--- .../com/bakdata/kafka/integration/RunProducerAppTest.java | 4 ++-- .../com/bakdata/kafka/integration/RunStreamsAppTest.java | 4 ++-- .../com/bakdata/kafka/integration/StreamsCleanUpTest.java | 4 ++-- .../test/java/com/bakdata/kafka/integration/KafkaTest.java | 4 ++-- .../java/com/bakdata/kafka/util/SchemaTopicClientTest.java | 4 ++-- .../test/java/com/bakdata/kafka/util/TopicClientTest.java | 4 ++-- .../java/com/bakdata/kafka/KafkaContainerHelper.java | 4 ++-- .../src/testFixtures/java/com/bakdata/kafka/TestUtil.java | 6 +++--- 10 files changed, 23 insertions(+), 23 deletions(-) diff --git a/gradle.properties b/gradle.properties index 01c84aa8..7fe37ee3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,10 +2,10 @@ version=3.1.1-SNAPSHOT org.gradle.caching=true # running Kafka Streams in parallel causes problems with colliding consumer groups org.gradle.parallel=false -kafkaVersion=3.6.1 +kafkaVersion=3.7.1 testContainersVersion=1.20.4 -confluentVersion=7.6.0 -fluentKafkaVersion=2.14.0 +confluentVersion=7.7.0 +fluentKafkaVersion=2.15.0 junitVersion=5.10.2 mockitoVersion=5.11.0 assertJVersion=3.25.3 diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index efb7c9df..7912746e 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -40,7 +40,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Consumed; import org.junit.jupiter.api.Test; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; class CliTest { @@ -214,7 +214,7 @@ public SerdeConfig defaultSerializationConfig() { @ExpectSystemExitWithStatus(1) void shouldExitWithErrorInTopology() throws InterruptedException { final String input = "input"; - try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); + try (final KafkaContainer kafkaCluster = newKafkaCluster(); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -251,7 +251,7 @@ public SerdeConfig defaultSerializationConfig() { void shouldExitWithSuccessCodeOnShutdown() { final String input = "input"; final String output = "output"; - try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); + try (final KafkaContainer kafkaCluster = newKafkaCluster(); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index 99a5ef60..d9d2ae56 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -52,7 +52,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; @Testcontainers class RunProducerAppTest { @@ -60,7 +60,7 @@ class RunProducerAppTest { @RegisterExtension final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); @Container - private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); + private final KafkaContainer kafkaCluster = newKafkaCluster(); @BeforeEach void setup() { diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index 62c52b27..c1b46273 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -47,14 +47,14 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; @Testcontainers @ExtendWith(MockitoExtension.class) class RunStreamsAppTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); @Container - private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); + private final KafkaContainer kafkaCluster = newKafkaCluster(); @Test void shouldRunApp() { diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index d62051ec..2771ef27 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -56,7 +56,7 @@ import org.mockito.quality.Strictness; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; @Testcontainers @Slf4j @@ -66,7 +66,7 @@ class StreamsCleanUpTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); @Container - private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); + private final KafkaContainer kafkaCluster = newKafkaCluster(); @InjectSoftAssertions private SoftAssertions softly; diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java index 6a6e5d37..52eeb988 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java @@ -31,14 +31,14 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; @Testcontainers abstract class KafkaTest { @RegisterExtension final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); @Container - private final ConfluentKafkaContainer kafkaCluster = TestUtil.newKafkaCluster(); + private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster(); KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { return KafkaEndpointConfig.builder() diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index 9472184e..909c85b7 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -52,7 +52,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; @Testcontainers @Slf4j @@ -63,7 +63,7 @@ class SchemaTopicClientTest { @RegisterExtension final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); @Container - private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); + private final KafkaContainer kafkaCluster = newKafkaCluster(); @InjectSoftAssertions SoftAssertions softly; diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java index d3111756..ac738b72 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java @@ -35,14 +35,14 @@ import org.junit.jupiter.api.Test; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; @Testcontainers class TopicClientTest { private static final Duration CLIENT_TIMEOUT = Duration.ofSeconds(10L); @Container - private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); + private final KafkaContainer kafkaCluster = newKafkaCluster(); @Test void shouldNotFindTopic() { diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java index c1f4d8d8..51fa262e 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java @@ -49,7 +49,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; @RequiredArgsConstructor public class KafkaContainerHelper { @@ -58,7 +58,7 @@ public class KafkaContainerHelper { .partitions(1) .replicationFactor((short) 1) .build(); - private final @NonNull ConfluentKafkaContainer kafkaCluster; + private final @NonNull KafkaContainer kafkaCluster; private static List> pollAll(final Consumer consumer, final Duration timeout) { final List> records = new ArrayList<>(); diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java index 1af3402d..4fb71585 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java @@ -25,12 +25,12 @@ package com.bakdata.kafka; import lombok.experimental.UtilityClass; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; import org.testcontainers.utility.DockerImageName; @UtilityClass public class TestUtil { - public static ConfluentKafkaContainer newKafkaCluster() { - return new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0")); + public static KafkaContainer newKafkaCluster() { + return new KafkaContainer(DockerImageName.parse("apache/kafka:3.7.1")); } }