Skip to content

Commit

Permalink
Add methods for simplified testing of Kafka endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 16, 2025
1 parent 8759f60 commit f353ec1
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.bakdata.kafka;

import static com.bakdata.kafka.KafkaTest.newCluster;
import static com.bakdata.kafka.KafkaTest.newTestClient;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
Expand Down Expand Up @@ -238,7 +237,9 @@ public SerdeConfig defaultSerializationConfig() {
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--input-topics", input
);
newTestClient(kafkaCluster).send()
new KafkaTestClient(KafkaEndpointConfig.builder()
.bootstrapServers(kafkaCluster.getBootstrapServers())
.build()).send()
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
Thread.sleep(Duration.ofSeconds(10).toMillis());
}
Expand Down Expand Up @@ -268,7 +269,9 @@ public SerdeConfig defaultSerializationConfig() {
}
})) {
kafkaCluster.start();
final KafkaTestClient testClient = newTestClient(kafkaCluster);
final KafkaTestClient testClient = new KafkaTestClient(KafkaEndpointConfig.builder()
.bootstrapServers(kafkaCluster.getBootstrapServers())
.build());
testClient.createTopic(output);

runApp(app,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import com.bakdata.kafka.TestRecord;
import com.bakdata.kafka.util.ImprovedAdminClient;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.time.Duration;
Expand Down Expand Up @@ -78,7 +77,6 @@ public SerializerConfig defaultSerializationConfig() {
assertThat(testClient.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
.<String, TestRecord>from(output, TIMEOUT))
.hasSize(1)
.anySatisfy(kv -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import com.bakdata.kafka.util.TopicClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -273,7 +272,6 @@ void shouldDeleteInternalTopics() throws InterruptedException {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -324,7 +322,6 @@ void shouldDeleteIntermediateTopics() throws InterruptedException {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -432,7 +429,6 @@ void shouldDeleteValueSchema()
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new SimpleProducerRecord<>(null, testRecord)
Expand Down Expand Up @@ -463,7 +459,6 @@ void shouldDeleteKeySchema()
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new SimpleProducerRecord<>(testRecord, "val")
Expand Down Expand Up @@ -494,7 +489,6 @@ void shouldDeleteSchemaOfInternalTopics()
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -533,7 +527,6 @@ void shouldDeleteSchemaOfIntermediateTopics()
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getTopics().getOutputTopic());
testClient.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.bakdata.kafka.TestRecord;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -70,7 +69,6 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet()

testClient.send()
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.to(TOPIC, List.of(
new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build())
));
Expand Down Expand Up @@ -104,7 +102,6 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc

testClient.send()
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.to(TOPIC, List.of(
new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build())
));
Expand Down Expand Up @@ -139,7 +136,6 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr

testClient.send()
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.to(TOPIC, List.of(
new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build())
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ public static KafkaContainer newCluster() {
return new KafkaContainer(DockerImageName.parse("apache/kafka-native:3.8.1"));
}

public static KafkaTestClient newTestClient(final KafkaContainer kafkaContainer) {
return new KafkaTestClient(kafkaContainer.getBootstrapServers());
}

protected KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
return KafkaEndpointConfig.builder()
.bootstrapServers(this.getBootstrapServers())
Expand All @@ -62,7 +58,7 @@ protected String getBootstrapServers() {
}

protected KafkaTestClient newTestClient() {
return newTestClient(this.kafkaCluster);
return new KafkaTestClient(this.createEndpoint());
}

protected String getSchemaRegistryUrl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import com.bakdata.kafka.util.ImprovedAdminClient;
import com.bakdata.kafka.util.TopicClient;
import com.bakdata.kafka.util.TopicSettings;
import java.util.Map;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand All @@ -43,30 +41,26 @@ public class KafkaTestClient {
.partitions(1)
.replicationFactor((short) 1)
.build();
private final @NonNull String bootstrapServers;
private final @NonNull KafkaEndpointConfig endpointConfig;

public static TopicSettings defaultTopicSettings() {
return DEFAULT_TOPIC_SETTINGS;
}

public SenderBuilder send() {
return new SenderBuilder()
.with(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers)
return new SenderBuilder(this.endpointConfig.createKafkaProperties())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
}

public ReaderBuilder read() {
return new ReaderBuilder()
.with(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers)
return new ReaderBuilder(this.endpointConfig.createKafkaProperties())
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}

public ImprovedAdminClient admin() {
return ImprovedAdminClient.create(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaTestClient.this.bootstrapServers
));
return ImprovedAdminClient.create(this.endpointConfig.createKafkaProperties());
}

public void createTopic(final String topicName, final TopicSettings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ public class ReaderBuilder {

private final @NonNull Map<String, Object> properties;

public ReaderBuilder() {
this(new HashMap<>());
}

private static <K, V> List<ConsumerRecord<K, V>> pollAll(final Consumer<K, V> consumer, final Duration timeout) {
final List<ConsumerRecord<K, V>> records = new ArrayList<>();
ConsumerRecords<K, V> poll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ public class SenderBuilder {

private final @NonNull Map<String, Object> properties;

public SenderBuilder() {
this(new HashMap<>());
}

public SenderBuilder with(final String key, final Object value) {
final Map<String, Object> newProperties = new HashMap<>(this.properties);
newProperties.put(key, value);
Expand Down Expand Up @@ -77,8 +73,8 @@ public SimpleProducerRecord(final K key, final V value, final Iterable<Header> h
}

private ProducerRecord<K, V> toProducerRecord(final String topic) {
final Long millis = this.timestamp == null ? null : this.timestamp.toEpochMilli();
return new ProducerRecord<>(topic, null, millis, this.key, this.value, this.headers);
final Long timestampMillis = this.timestamp == null ? null : this.timestamp.toEpochMilli();
return new ProducerRecord<>(topic, null, timestampMillis, this.key, this.value, this.headers);
}
}

Expand Down

0 comments on commit f353ec1

Please sign in to comment.