Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Kafka 3.7 #266

Merged
merged 14 commits into from
Jan 7, 2025
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ version=3.1.1-SNAPSHOT
org.gradle.caching=true
# running Kafka Streams in parallel causes problems with colliding consumer groups
org.gradle.parallel=false
kafkaVersion=3.6.1
kafkaVersion=3.7.1
testContainersVersion=1.20.4
confluentVersion=7.6.0
fluentKafkaVersion=2.14.0
confluentVersion=7.7.0
fluentKafkaVersion=2.15.0
junitVersion=5.10.2
mockitoVersion=5.11.0
assertJVersion=3.25.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Consumed;
import org.junit.jupiter.api.Test;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

class CliTest {

Expand Down Expand Up @@ -214,7 +214,7 @@ public SerdeConfig defaultSerializationConfig() {
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorInTopology() throws InterruptedException {
final String input = "input";
try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
try (final KafkaContainer kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down Expand Up @@ -251,7 +251,7 @@ public SerdeConfig defaultSerializationConfig() {
void shouldExitWithSuccessCodeOnShutdown() {
final String input = "input";
final String output = "output";
try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
try (final KafkaContainer kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
class RunProducerAppTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
private final KafkaContainer kafkaCluster = newKafkaCluster();

@BeforeEach
void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
@ExtendWith(MockitoExtension.class)
class RunStreamsAppTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
private final KafkaContainer kafkaCluster = newKafkaCluster();

@Test
void shouldRunApp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.mockito.quality.Strictness;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
@Slf4j
Expand All @@ -66,7 +66,7 @@
class StreamsCleanUpTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
private final KafkaContainer kafkaCluster = newKafkaCluster();
@InjectSoftAssertions
private SoftAssertions softly;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
abstract class KafkaTest {
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
@Container
private final ConfluentKafkaContainer kafkaCluster = TestUtil.newKafkaCluster();
private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster();

KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
return KafkaEndpointConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
@Slf4j
Expand All @@ -63,7 +63,7 @@ class SchemaTopicClientTest {
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
private final KafkaContainer kafkaCluster = newKafkaCluster();

@InjectSoftAssertions
SoftAssertions softly;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
class TopicClientTest {

private static final Duration CLIENT_TIMEOUT = Duration.ofSeconds(10L);
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
private final KafkaContainer kafkaCluster = newKafkaCluster();

@Test
void shouldNotFindTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;

@RequiredArgsConstructor
public class KafkaContainerHelper {
Expand All @@ -58,7 +58,7 @@ public class KafkaContainerHelper {
.partitions(1)
.replicationFactor((short) 1)
.build();
private final @NonNull ConfluentKafkaContainer kafkaCluster;
private final @NonNull KafkaContainer kafkaCluster;

private static <K, V> List<ConsumerRecord<K, V>> pollAll(final Consumer<K, V> consumer, final Duration timeout) {
final List<ConsumerRecord<K, V>> records = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
package com.bakdata.kafka;

import lombok.experimental.UtilityClass;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@UtilityClass
public class TestUtil {
public static ConfluentKafkaContainer newKafkaCluster() {
return new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));
public static KafkaContainer newKafkaCluster() {
return new KafkaContainer(DockerImageName.parse("apache/kafka:3.7.1"));
}
}