Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 20, 2025
1 parent 4bf9fb5 commit 1601954
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import static com.bakdata.kafka.KafkaTest.POLL_TIMEOUT;
import static com.bakdata.kafka.KafkaTest.newCluster;
import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -32,9 +33,13 @@
import java.time.Duration;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.Consumed;
import org.junit.jupiter.api.Test;
import org.testcontainers.kafka.KafkaContainer;
Expand Down Expand Up @@ -240,6 +245,8 @@ public SerdeConfig defaultSerializationConfig() {
new KafkaTestClient(KafkaEndpointConfig.builder()
.bootstrapServers(kafkaCluster.getBootstrapServers())
.build()).send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
Thread.sleep(Duration.ofSeconds(10).toMillis());
}
Expand Down Expand Up @@ -280,9 +287,13 @@ public SerdeConfig defaultSerializationConfig() {
"--output-topic", output
);
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
final List<ConsumerRecord<String, String>> keyValues = testClient.read()
.from(output, Duration.ofSeconds(10));
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.from(output, POLL_TIMEOUT);
assertThat(keyValues)
.hasSize(1)
.anySatisfy(kv -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public SerializerConfig defaultSerializationConfig() {
assertThat(testClient.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class)
.<String, TestRecord>from(output, TIMEOUT))
.<String, TestRecord>from(output, POLL_TIMEOUT))
.hasSize(1)
.anySatisfy(kv -> {
assertThat(kv.key()).isEqualTo("foo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.test_applications.Mirror;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -45,7 +44,6 @@

@ExtendWith(MockitoExtension.class)
class RunStreamsAppTest extends KafkaTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);

@Test
void shouldRunApp() {
Expand All @@ -69,7 +67,7 @@ void shouldRunApp() {
assertThat(testClient.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.from(output, TIMEOUT))
.from(output, POLL_TIMEOUT))
.hasSize(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.assertj.core.api.SoftAssertions;
Expand Down Expand Up @@ -80,6 +83,8 @@ void shouldClean() throws InterruptedException {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getOutputTopic());
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(app.getInputTopics().get(0), List.of(
new SimpleProducerRecord<>(null, "blub"),
new SimpleProducerRecord<>(null, "bla"),
Expand Down Expand Up @@ -114,6 +119,8 @@ void shouldReset() throws InterruptedException {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getOutputTopic());
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(app.getInputTopics().get(0), List.of(
new SimpleProducerRecord<>(null, "blub"),
new SimpleProducerRecord<>(null, "bla"),
Expand Down Expand Up @@ -169,8 +176,9 @@ private CloseFlagApp createCloseFlagApplication() {

private List<KeyValue<String, Long>> readOutputTopic(final String outputTopic) {
final List<ConsumerRecord<String, Long>> records = this.newTestClient().read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class)
.from(outputTopic, TIMEOUT);
.from(outputTopic, POLL_TIMEOUT);
return records.stream()
.map(consumerRecord -> new KeyValue<>(consumerRecord.key(), consumerRecord.value()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KeyValue;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
Expand Down Expand Up @@ -174,8 +175,10 @@ public ProducerCleanUpConfiguration setupCleanUp(
}

private List<KeyValue<String, String>> readOutputTopic(final String outputTopic) {
final List<ConsumerRecord<String, String>> records =
this.newTestClient().read().from(outputTopic, Duration.ofSeconds(1L));
final List<ConsumerRecord<String, String>> records = this.newTestClient().read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.from(outputTopic, POLL_TIMEOUT);
return records.stream()
.map(StreamsCleanUpRunnerTest::toKeyValue)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerRunner;
import com.bakdata.kafka.ProducerTopicConfig;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KeyValue;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
Expand Down Expand Up @@ -72,8 +73,10 @@ void shouldRunApp() {
}

private List<KeyValue<String, String>> readOutputTopic(final String outputTopic) {
final List<ConsumerRecord<String, String>> records =
this.newTestClient().read().from(outputTopic, Duration.ofSeconds(1L));
final List<ConsumerRecord<String, String>> records = this.newTestClient().read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.from(outputTopic, POLL_TIMEOUT);
return records.stream()
.map(StreamsCleanUpRunnerTest::toKeyValue)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.assertj.core.api.SoftAssertions;
Expand Down Expand Up @@ -148,6 +149,8 @@ void shouldDeleteTopic() throws InterruptedException {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(app.getTopics().getInputTopics().get(0), List.of(
new SimpleProducerRecord<>(null, "blub"),
new SimpleProducerRecord<>(null, "bla"),
Expand Down Expand Up @@ -184,6 +187,8 @@ void shouldDeleteConsumerGroup() throws InterruptedException {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(app.getTopics().getInputTopics().get(0), List.of(
new SimpleProducerRecord<>(null, "blub"),
new SimpleProducerRecord<>(null, "bla"),
Expand All @@ -200,7 +205,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException {
this.assertContent(app.getTopics().getOutputTopic(), expectedValues,
"WordCount contains all elements after first run");

try (final ImprovedAdminClient adminClient = this.createAdminClient();
try (final ImprovedAdminClient adminClient = testClient.admin();
final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) {
this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId()))
.as("Consumer group exists")
Expand All @@ -210,7 +215,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException {
Thread.sleep(TIMEOUT.toMillis());
clean(executableApp);

try (final ImprovedAdminClient adminClient = this.createAdminClient();
try (final ImprovedAdminClient adminClient = testClient.admin();
final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) {
this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId()))
.as("Consumer group is deleted")
Expand All @@ -227,6 +232,8 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(app.getTopics().getInputTopics().get(0), List.of(
new SimpleProducerRecord<>(null, "blub"),
new SimpleProducerRecord<>(null, "bla"),
Expand All @@ -243,7 +250,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept
this.assertContent(app.getTopics().getOutputTopic(), expectedValues,
"WordCount contains all elements after first run");

try (final ImprovedAdminClient adminClient = this.createAdminClient();
try (final ImprovedAdminClient adminClient = testClient.admin();
final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) {
this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId()))
.as("Consumer group exists")
Expand All @@ -252,7 +259,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept

Thread.sleep(TIMEOUT.toMillis());

try (final ImprovedAdminClient adminClient = this.createAdminClient();
try (final ImprovedAdminClient adminClient = testClient.admin();
final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) {
consumerGroupClient.deleteConsumerGroup(app.getUniqueAppId());
this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId()))
Expand Down Expand Up @@ -362,6 +369,8 @@ void shouldDeleteState() throws InterruptedException {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(app.getTopics().getInputTopics().get(0), List.of(
new SimpleProducerRecord<>(null, "blub"),
new SimpleProducerRecord<>(null, "bla"),
Expand Down Expand Up @@ -398,6 +407,8 @@ void shouldReprocessAlreadySeenRecords() throws InterruptedException {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(app.getTopics().getInputTopics().get(0), List.of(
new SimpleProducerRecord<>(null, "a"),
new SimpleProducerRecord<>(null, "b"),
Expand Down Expand Up @@ -429,6 +440,8 @@ void shouldDeleteValueSchema()
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new SimpleProducerRecord<>(null, testRecord)
Expand Down Expand Up @@ -460,6 +473,7 @@ void shouldDeleteKeySchema()
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(app.getTopics().getInputTopics().get(0), List.of(
new SimpleProducerRecord<>(testRecord, "val")
));
Expand Down Expand Up @@ -613,11 +627,15 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to("input_topic", List.of(
new SimpleProducerRecord<>(null, "a"),
new SimpleProducerRecord<>(null, "b")
));
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to("another_topic", List.of(
new SimpleProducerRecord<>(null, "c")
));
Expand Down Expand Up @@ -659,14 +677,11 @@ public StreamsCleanUpConfiguration setupCleanUp(
.build());
}

private ImprovedAdminClient createAdminClient() {
return ImprovedAdminClient.create(this.createEndpoint().createKafkaProperties());
}

private List<KeyValue<String, Long>> readOutputTopic(final String outputTopic) {
final List<ConsumerRecord<String, Long>> records = this.newTestClient().read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class)
.from(outputTopic, TIMEOUT);
.from(outputTopic, POLL_TIMEOUT);
return records.stream()
.map(StreamsCleanUpRunnerTest::toKeyValue)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ void shouldRunApp() {
this.softly.assertThat(testClient.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.from(outputTopic, TIMEOUT))
.from(outputTopic, POLL_TIMEOUT))
.hasSize(1);
}
}
Expand Down Expand Up @@ -166,7 +166,7 @@ void shouldUseMultipleLabeledInputTopics() {
this.softly.assertThat(testClient.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.from(outputTopic, TIMEOUT))
.from(outputTopic, POLL_TIMEOUT))
.hasSize(2);
}
}
Expand Down
Loading

0 comments on commit 1601954

Please sign in to comment.