Skip to content

Commit

Permalink
Upgrade to Kafka 3.7
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 7, 2025
1 parent ccf8cd9 commit 4d9004c
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 23 deletions.
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ version=3.1.1-SNAPSHOT
org.gradle.caching=true
# running Kafka Streams in parallel causes problems with colliding consumer groups
org.gradle.parallel=false
kafkaVersion=3.6.1
kafkaVersion=3.7.1
testContainersVersion=1.20.4
confluentVersion=7.6.0
fluentKafkaVersion=2.14.0
confluentVersion=7.7.0
fluentKafkaVersion=2.15.0
junitVersion=5.10.2
mockitoVersion=5.11.0
assertJVersion=3.25.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Consumed;
import org.junit.jupiter.api.Test;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

class CliTest {

Expand Down Expand Up @@ -214,7 +214,7 @@ public SerdeConfig defaultSerializationConfig() {
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorInTopology() throws InterruptedException {
final String input = "input";
try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
try (final KafkaContainer kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down Expand Up @@ -251,7 +251,7 @@ public SerdeConfig defaultSerializationConfig() {
void shouldExitWithSuccessCodeOnShutdown() {
final String input = "input";
final String output = "output";
try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
try (final KafkaContainer kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
class RunProducerAppTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
private final KafkaContainer kafkaCluster = newKafkaCluster();

@BeforeEach
void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
@ExtendWith(MockitoExtension.class)
class RunStreamsAppTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
private final KafkaContainer kafkaCluster = newKafkaCluster();

@Test
void shouldRunApp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.mockito.quality.Strictness;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
@Slf4j
Expand All @@ -66,7 +66,7 @@
class StreamsCleanUpTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
private final KafkaContainer kafkaCluster = newKafkaCluster();
@InjectSoftAssertions
private SoftAssertions softly;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
abstract class KafkaTest {
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
@Container
private final ConfluentKafkaContainer kafkaCluster = TestUtil.newKafkaCluster();
private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster();

KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
return KafkaEndpointConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
@Slf4j
Expand All @@ -63,7 +63,7 @@ class SchemaTopicClientTest {
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
private final KafkaContainer kafkaCluster = newKafkaCluster();

@InjectSoftAssertions
SoftAssertions softly;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
class TopicClientTest {

private static final Duration CLIENT_TIMEOUT = Duration.ofSeconds(10L);
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
private final KafkaContainer kafkaCluster = newKafkaCluster();

@Test
void shouldNotFindTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@RequiredArgsConstructor
public class KafkaContainerHelper {
Expand All @@ -58,7 +58,7 @@ public class KafkaContainerHelper {
.partitions(1)
.replicationFactor((short) 1)
.build();
private final @NonNull ConfluentKafkaContainer kafkaCluster;
private final @NonNull KafkaContainer kafkaCluster;

private static <K, V> List<ConsumerRecord<K, V>> pollAll(final Consumer<K, V> consumer, final Duration timeout) {
final List<ConsumerRecord<K, V>> records = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
package com.bakdata.kafka;

import lombok.experimental.UtilityClass;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@UtilityClass
public class TestUtil {
public static ConfluentKafkaContainer newKafkaCluster() {
return new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));
public static KafkaContainer newKafkaCluster() {
return new KafkaContainer(DockerImageName.parse("apache/kafka:3.7.1"));
}
}

0 comments on commit 4d9004c

Please sign in to comment.