diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index 5d9d6616..b777d932 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import static com.bakdata.kafka.KafkaTest.POLL_TIMEOUT; import static com.bakdata.kafka.KafkaTest.newCluster; import static org.assertj.core.api.Assertions.assertThat; @@ -32,9 +33,13 @@ import java.time.Duration; import java.util.List; import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.kstream.Consumed; import org.junit.jupiter.api.Test; import org.testcontainers.kafka.KafkaContainer; @@ -240,6 +245,8 @@ public SerdeConfig defaultSerializationConfig() { new KafkaTestClient(KafkaEndpointConfig.builder() .bootstrapServers(kafkaCluster.getBootstrapServers()) .build()).send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(input, List.of(new SimpleProducerRecord<>("foo", "bar"))); Thread.sleep(Duration.ofSeconds(10).toMillis()); } @@ -280,9 +287,13 @@ public SerdeConfig defaultSerializationConfig() { "--output-topic", output ); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(input, List.of(new SimpleProducerRecord<>("foo", "bar"))); final List> keyValues = testClient.read() - .from(output, Duration.ofSeconds(10)); + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .from(output, POLL_TIMEOUT); assertThat(keyValues) .hasSize(1) .anySatisfy(kv -> { diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index a752f3db..f5249942 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -77,7 +77,7 @@ public SerializerConfig defaultSerializationConfig() { assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class) - .from(output, TIMEOUT)) + .from(output, POLL_TIMEOUT)) .hasSize(1) .anySatisfy(kv -> { assertThat(kv.key()).isEqualTo("foo"); diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index 120c6b57..60569ad8 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -32,7 +32,6 @@ import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; import com.bakdata.kafka.test_applications.Mirror; -import java.time.Duration; import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -45,7 +44,6 @@ @ExtendWith(MockitoExtension.class) class RunStreamsAppTest extends KafkaTest { - private static final Duration TIMEOUT = Duration.ofSeconds(10); @Test void shouldRunApp() { @@ -69,7 +67,7 @@ void shouldRunApp() { assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .from(output, TIMEOUT)) + .from(output, POLL_TIMEOUT)) .hasSize(1); } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 83f807d3..c309c6f8 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -41,7 +41,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.assertj.core.api.SoftAssertions; @@ -80,6 +83,8 @@ void shouldClean() throws InterruptedException { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getOutputTopic()); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getInputTopics().get(0), List.of( new SimpleProducerRecord<>(null, "blub"), new SimpleProducerRecord<>(null, "bla"), @@ -114,6 +119,8 @@ void shouldReset() throws InterruptedException { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getOutputTopic()); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getInputTopics().get(0), List.of( new SimpleProducerRecord<>(null, "blub"), new SimpleProducerRecord<>(null, "bla"), @@ -169,8 +176,9 @@ private CloseFlagApp createCloseFlagApplication() { private List> readOutputTopic(final String outputTopic) { final List> records = this.newTestClient().read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class) - .from(outputTopic, TIMEOUT); + .from(outputTopic, POLL_TIMEOUT); return records.stream() .map(consumerRecord -> new KeyValue<>(consumerRecord.key(), consumerRecord.value())) .collect(Collectors.toList()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java index 325479e8..13263734 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java @@ -47,10 +47,11 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import java.io.IOException; -import java.time.Duration; import java.util.List; import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.KeyValue; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; @@ -174,8 +175,10 @@ public ProducerCleanUpConfiguration setupCleanUp( } private List> readOutputTopic(final String outputTopic) { - final List> records = - this.newTestClient().read().from(outputTopic, Duration.ofSeconds(1L)); + final List> records = this.newTestClient().read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .from(outputTopic, POLL_TIMEOUT); return records.stream() .map(StreamsCleanUpRunnerTest::toKeyValue) .collect(Collectors.toList()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java index dfd480b0..9daec3ce 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java @@ -32,10 +32,11 @@ import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerRunner; import com.bakdata.kafka.ProducerTopicConfig; -import java.time.Duration; import java.util.List; import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.KeyValue; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; @@ -72,8 +73,10 @@ void shouldRunApp() { } private List> readOutputTopic(final String outputTopic) { - final List> records = - this.newTestClient().read().from(outputTopic, Duration.ofSeconds(1L)); + final List> records = this.newTestClient().read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .from(outputTopic, POLL_TIMEOUT); return records.stream() .map(StreamsCleanUpRunnerTest::toKeyValue) .collect(Collectors.toList()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 26477c90..58ca2f85 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -66,6 +66,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; import org.assertj.core.api.SoftAssertions; @@ -148,6 +149,8 @@ void shouldDeleteTopic() throws InterruptedException { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getOutputTopic()); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( new SimpleProducerRecord<>(null, "blub"), new SimpleProducerRecord<>(null, "bla"), @@ -184,6 +187,8 @@ void shouldDeleteConsumerGroup() throws InterruptedException { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getOutputTopic()); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( new SimpleProducerRecord<>(null, "blub"), new SimpleProducerRecord<>(null, "bla"), @@ -200,7 +205,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); - try (final ImprovedAdminClient adminClient = this.createAdminClient(); + try (final ImprovedAdminClient adminClient = testClient.admin(); final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) .as("Consumer group exists") @@ -210,7 +215,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { Thread.sleep(TIMEOUT.toMillis()); clean(executableApp); - try (final ImprovedAdminClient adminClient = this.createAdminClient(); + try (final ImprovedAdminClient adminClient = testClient.admin(); final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) .as("Consumer group is deleted") @@ -227,6 +232,8 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getOutputTopic()); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( new SimpleProducerRecord<>(null, "blub"), new SimpleProducerRecord<>(null, "bla"), @@ -243,7 +250,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); - try (final ImprovedAdminClient adminClient = this.createAdminClient(); + try (final ImprovedAdminClient adminClient = testClient.admin(); final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) .as("Consumer group exists") @@ -252,7 +259,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept Thread.sleep(TIMEOUT.toMillis()); - try (final ImprovedAdminClient adminClient = this.createAdminClient(); + try (final ImprovedAdminClient adminClient = testClient.admin(); final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { consumerGroupClient.deleteConsumerGroup(app.getUniqueAppId()); this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) @@ -362,6 +369,8 @@ void shouldDeleteState() throws InterruptedException { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getOutputTopic()); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( new SimpleProducerRecord<>(null, "blub"), new SimpleProducerRecord<>(null, "bla"), @@ -398,6 +407,8 @@ void shouldReprocessAlreadySeenRecords() throws InterruptedException { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getOutputTopic()); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( new SimpleProducerRecord<>(null, "a"), new SimpleProducerRecord<>(null, "b"), @@ -429,6 +440,8 @@ void shouldDeleteValueSchema() final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getOutputTopic()); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( new SimpleProducerRecord<>(null, testRecord) @@ -460,6 +473,7 @@ void shouldDeleteKeySchema() testClient.createTopic(app.getTopics().getOutputTopic()); testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( new SimpleProducerRecord<>(testRecord, "val") )); @@ -613,11 +627,15 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getOutputTopic()); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to("input_topic", List.of( new SimpleProducerRecord<>(null, "a"), new SimpleProducerRecord<>(null, "b") )); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to("another_topic", List.of( new SimpleProducerRecord<>(null, "c") )); @@ -659,14 +677,11 @@ public StreamsCleanUpConfiguration setupCleanUp( .build()); } - private ImprovedAdminClient createAdminClient() { - return ImprovedAdminClient.create(this.createEndpoint().createKafkaProperties()); - } - private List> readOutputTopic(final String outputTopic) { final List> records = this.newTestClient().read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class) - .from(outputTopic, TIMEOUT); + .from(outputTopic, POLL_TIMEOUT); return records.stream() .map(StreamsCleanUpRunnerTest::toKeyValue) .collect(Collectors.toList()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 612955e0..bfe14596 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -136,7 +136,7 @@ void shouldRunApp() { this.softly.assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .from(outputTopic, TIMEOUT)) + .from(outputTopic, POLL_TIMEOUT)) .hasSize(1); } } @@ -166,7 +166,7 @@ void shouldUseMultipleLabeledInputTopics() { this.softly.assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .from(outputTopic, TIMEOUT)) + .from(outputTopic, POLL_TIMEOUT)) .hasSize(2); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index 0761c396..932a3819 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -41,6 +41,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; @@ -68,6 +69,7 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() .isTrue(); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) .to(TOPIC, List.of( new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build()) @@ -101,6 +103,7 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc .isTrue(); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) .to(TOPIC, List.of( new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build()) @@ -135,6 +138,7 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr .isTrue(); testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) .to(TOPIC, List.of( new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build()) diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index c58c8722..94872064 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -25,6 +25,7 @@ package com.bakdata.kafka; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import java.time.Duration; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; @@ -32,12 +33,14 @@ @Testcontainers public abstract class KafkaTest { + protected static final Duration POLL_TIMEOUT = Duration.ofSeconds(10); private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = newCluster(); public static KafkaContainer newCluster() { - return new KafkaContainer(DockerImageName.parse("apache/kafka-native:3.8.1")); + return new KafkaContainer(DockerImageName.parse("apache/kafka-native") + .withTag("3.8.1")); } protected KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java index 8f951cc9..7d3ea30b 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java @@ -33,10 +33,6 @@ import java.util.Map; import lombok.NonNull; import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; /** * Client that supports communication with Kafka clusters in test setups, including topic management, reading from @@ -59,23 +55,19 @@ public static TopicSettingsBuilder defaultTopicSettings() { } /** - * Prepare sending new data to the cluster. {@link StringSerializer} is configured by default. + * Prepare sending new data to the cluster * @return configured {@code SenderBuilder} */ public SenderBuilder send() { - return new SenderBuilder(this.endpointConfig.createKafkaProperties()) - .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new SenderBuilder(this.endpointConfig.createKafkaProperties()); } /** - * Prepare reading data from the cluster. {@link StringDeserializer} is configured by default. + * Prepare reading data from the cluster * @return configured {@code ReaderBuilder} */ public ReaderBuilder read() { - return new ReaderBuilder(this.endpointConfig.createKafkaProperties()) - .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new ReaderBuilder(this.endpointConfig.createKafkaProperties()); } /** diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java index a50a926b..4bd134f4 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java @@ -61,13 +61,17 @@ private static List> readAll(final Consumer co final Duration timeout) { final List partitionInfos = consumer.listTopics().get(topic); final List topicPartitions = partitionInfos.stream() - .map(partition -> new TopicPartition(partition.topic(), partition.partition())) + .map(ReaderBuilder::toTopicPartition) .collect(Collectors.toList()); consumer.assign(topicPartitions); consumer.seekToBeginning(topicPartitions); return pollAll(consumer, timeout); } + private static TopicPartition toTopicPartition(final PartitionInfo partition) { + return new TopicPartition(partition.topic(), partition.partition()); + } + /** * Add a consumer configuration * @param key configuration key