From abda0efde6b4ea43d449ffed517a058e89bb17f4 Mon Sep 17 00:00:00 2001 From: Robin Date: Wed, 13 Mar 2024 21:42:31 +0100 Subject: [PATCH] move and create defaults for configs --- .../streams/v2/config/SchemaRegistryConfig.kt | 17 ++++++++++++ .../aap/kafka/streams/v2/config/SslConfig.kt | 18 ++++--------- .../kafka/streams/v2/config/StreamsConfig.kt | 16 ++++++----- .../streams/v2/consumer/ConsumerConfig.kt | 3 +-- .../streams/v2/producer/ProducerConfig.kt | 2 +- .../no/nav/aap/kafka/streams/v2/TestUtils.kt | 3 ++- .../schemaregistry/SchemaRegistryConfig.kt | 27 ------------------- .../no/nav/aap/kafka/serde/avro/AvroSerde.kt | 6 ++--- .../test/no/nav/aap/kafka/ConfigTest.kt | 20 ++------------ .../no/nav/aap/kafka/streams/v2/test/Test.kt | 2 ++ 10 files changed, 42 insertions(+), 72 deletions(-) create mode 100644 kafka-2/main/no/nav/aap/kafka/streams/v2/config/SchemaRegistryConfig.kt delete mode 100644 kafka-avroserde/main/no/nav/aap/kafka/schemaregistry/SchemaRegistryConfig.kt diff --git a/kafka-2/main/no/nav/aap/kafka/streams/v2/config/SchemaRegistryConfig.kt b/kafka-2/main/no/nav/aap/kafka/streams/v2/config/SchemaRegistryConfig.kt new file mode 100644 index 00000000..7237ca7d --- /dev/null +++ b/kafka-2/main/no/nav/aap/kafka/streams/v2/config/SchemaRegistryConfig.kt @@ -0,0 +1,17 @@ +package no.nav.aap.kafka.streams.v2.config + +import java.util.* + +private fun getEnvVar(envar: String) = System.getenv(envar) ?: error("missing envvar $envar") + +data class SchemaRegistryConfig( + private val url: String = getEnvVar("KAFKA_SCHEMA_REGISTRY"), + private val user: String = getEnvVar("KAFKA_SCHEMA_REGISTRY_USER"), + private val password: String = getEnvVar("KAFKA_SCHEMA_REGISTRY_PASSWORD"), +) { + fun properties() = Properties().apply { + this["schema.registry.url"] = url + this["basic.auth.credentials.source"] = "USER_INFO" + this["basic.auth.user.info"] = "$user:$password" + } +} diff --git a/kafka-2/main/no/nav/aap/kafka/streams/v2/config/SslConfig.kt b/kafka-2/main/no/nav/aap/kafka/streams/v2/config/SslConfig.kt index e5c0b15b..76445c34 100644 --- a/kafka-2/main/no/nav/aap/kafka/streams/v2/config/SslConfig.kt +++ b/kafka-2/main/no/nav/aap/kafka/streams/v2/config/SslConfig.kt @@ -4,10 +4,12 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SslConfigs import java.util.* +private fun getEnvVar(envar: String) = System.getenv(envar) ?: error("missing envvar $envar") + data class SslConfig( - private val truststorePath: String, - private val keystorePath: String, - private val credstorePsw: String, + private val truststorePath: String = getEnvVar("KAFKA_TRUSTSTORE_PATH"), + private val keystorePath: String = getEnvVar("KAFKA_KEYSTORE_PATH"), + private val credstorePsw: String = getEnvVar("KAFKA_CREDSTORE_PASSWORD"), ) { fun properties() = Properties().apply { this[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SSL" @@ -20,14 +22,4 @@ data class SslConfig( this[SslConfigs.SSL_KEY_PASSWORD_CONFIG] = credstorePsw this[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = "" } - - companion object { - val DEFAULT: SslConfig by lazy { - SslConfig( - truststorePath = System.getenv("KAFKA_TRUSTSTORE_PATH"), - keystorePath = System.getenv("KAFKA_KEYSTORE_PATH"), - credstorePsw = System.getenv("KAFKA_CREDSTORE_PASSWORD") - ) - } - } } diff --git a/kafka-2/main/no/nav/aap/kafka/streams/v2/config/StreamsConfig.kt b/kafka-2/main/no/nav/aap/kafka/streams/v2/config/StreamsConfig.kt index 40cf6ec5..d0933ab5 100644 --- a/kafka-2/main/no/nav/aap/kafka/streams/v2/config/StreamsConfig.kt +++ b/kafka-2/main/no/nav/aap/kafka/streams/v2/config/StreamsConfig.kt @@ -7,20 +7,22 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.streams.StreamsConfig import java.util.* +private fun getEnvVar(envar: String) = System.getenv(envar) ?: error("missing envvar $envar") + data class StreamsConfig( - internal val applicationId: String, - internal val brokers: String, - internal val compressionType: String = "snappy", - internal val ssl: SslConfig? = null, - internal val schemaRegistry: Properties = Properties(), - internal val additionalProperties: Properties = Properties(), + val applicationId: String = getEnvVar("KAFKA_STREAMS_APPLICATION_ID"), + val brokers: String = getEnvVar("KAFKA_BROKERS"), + val ssl: SslConfig? = SslConfig(), + val schemaRegistry: SchemaRegistryConfig = SchemaRegistryConfig(), + val compressionType: String = "snappy", + val additionalProperties: Properties = Properties(), ) { fun streamsProperties(): Properties = Properties().apply { this[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId this[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = brokers ssl?.let { putAll(it.properties()) } - putAll(schemaRegistry) + putAll(schemaRegistry.properties()) putAll(additionalProperties) /* Exception handler when leaving the stream, e.g. serialization */ diff --git a/kafka-2/main/no/nav/aap/kafka/streams/v2/consumer/ConsumerConfig.kt b/kafka-2/main/no/nav/aap/kafka/streams/v2/consumer/ConsumerConfig.kt index c320be20..ba420b95 100644 --- a/kafka-2/main/no/nav/aap/kafka/streams/v2/consumer/ConsumerConfig.kt +++ b/kafka-2/main/no/nav/aap/kafka/streams/v2/consumer/ConsumerConfig.kt @@ -19,9 +19,8 @@ class ConsumerConfig private constructor( constructor(streamsConfig: StreamsConfig) : this( brokers = streamsConfig.brokers, - ssl = streamsConfig.ssl, - schemaRegistry = streamsConfig.schemaRegistry, + schemaRegistry = streamsConfig.schemaRegistry.properties(), ) internal fun toProperties( diff --git a/kafka-2/main/no/nav/aap/kafka/streams/v2/producer/ProducerConfig.kt b/kafka-2/main/no/nav/aap/kafka/streams/v2/producer/ProducerConfig.kt index 021b97e8..9608b006 100644 --- a/kafka-2/main/no/nav/aap/kafka/streams/v2/producer/ProducerConfig.kt +++ b/kafka-2/main/no/nav/aap/kafka/streams/v2/producer/ProducerConfig.kt @@ -16,7 +16,7 @@ class ProducerConfig private constructor( constructor(streamsConfig: StreamsConfig) : this( brokers = streamsConfig.brokers, ssl = streamsConfig.ssl, - schemaRegistry = streamsConfig.schemaRegistry, + schemaRegistry = streamsConfig.schemaRegistry.properties(), compressionType = streamsConfig.compressionType ) diff --git a/kafka-2/test/no/nav/aap/kafka/streams/v2/TestUtils.kt b/kafka-2/test/no/nav/aap/kafka/streams/v2/TestUtils.kt index 3dbf82b7..bd460fbe 100644 --- a/kafka-2/test/no/nav/aap/kafka/streams/v2/TestUtils.kt +++ b/kafka-2/test/no/nav/aap/kafka/streams/v2/TestUtils.kt @@ -4,6 +4,7 @@ import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.simple.SimpleMeterRegistry import no.nav.aap.kafka.serde.json.Migratable import no.nav.aap.kafka.streams.concurrency.Bufferable +import no.nav.aap.kafka.streams.v2.config.SchemaRegistryConfig import no.nav.aap.kafka.streams.v2.config.StreamsConfig import no.nav.aap.kafka.streams.v2.processor.Processor import no.nav.aap.kafka.streams.v2.processor.ProcessorMetadata @@ -60,7 +61,7 @@ internal class StreamsMock : Streams { StreamsMock().apply { connect( topology = Topology().apply(topology), - config = StreamsConfig("", ""), + config = StreamsConfig("", "", null, SchemaRegistryConfig("", "", "")), registry = SimpleMeterRegistry() ) } diff --git a/kafka-avroserde/main/no/nav/aap/kafka/schemaregistry/SchemaRegistryConfig.kt b/kafka-avroserde/main/no/nav/aap/kafka/schemaregistry/SchemaRegistryConfig.kt deleted file mode 100644 index 0635a15b..00000000 --- a/kafka-avroserde/main/no/nav/aap/kafka/schemaregistry/SchemaRegistryConfig.kt +++ /dev/null @@ -1,27 +0,0 @@ -package no.nav.aap.kafka.schemaregistry - -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig -import java.util.* - -data class SchemaRegistryConfig( - private val url: String, - private val user: String, - private val password: String, -) { - fun properties() = Properties().apply { - this[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = url - this[SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE] = "USER_INFO" - this[SchemaRegistryClientConfig.USER_INFO_CONFIG] = "$user:$password" - } - - companion object { - val DEFAULT: SchemaRegistryConfig by lazy { - SchemaRegistryConfig( - url = System.getenv("KAFKA_SCHEMA_REGISTRY"), - user = System.getenv("KAFKA_SCHEMA_REGISTRY_USER"), - password = System.getenv("KAFKA_SCHEMA_REGISTRY_PASSWORD"), - ) - } - } -} diff --git a/kafka-avroserde/main/no/nav/aap/kafka/serde/avro/AvroSerde.kt b/kafka-avroserde/main/no/nav/aap/kafka/serde/avro/AvroSerde.kt index 8b3fc380..85c584c5 100644 --- a/kafka-avroserde/main/no/nav/aap/kafka/serde/avro/AvroSerde.kt +++ b/kafka-avroserde/main/no/nav/aap/kafka/serde/avro/AvroSerde.kt @@ -2,14 +2,14 @@ package no.nav.aap.kafka.serde.avro import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde -import no.nav.aap.kafka.schemaregistry.SchemaRegistryConfig +import no.nav.aap.kafka.streams.v2.config.SchemaRegistryConfig import no.nav.aap.kafka.streams.v2.config.SslConfig import org.apache.avro.specific.SpecificRecord object AvroSerde { fun specific( - schema: SchemaRegistryConfig = SchemaRegistryConfig.DEFAULT, - ssl: SslConfig = SslConfig.DEFAULT, + schema: SchemaRegistryConfig = SchemaRegistryConfig(), + ssl: SslConfig = SslConfig(), ): SpecificAvroSerde = SpecificAvroSerde().apply { val properties = schema.properties() + ssl.properties() val serdeConfig = properties.mapKeys { it.key.toString() } diff --git a/kafka-avroserde/test/no/nav/aap/kafka/ConfigTest.kt b/kafka-avroserde/test/no/nav/aap/kafka/ConfigTest.kt index ee0d8574..0eac41a6 100644 --- a/kafka-avroserde/test/no/nav/aap/kafka/ConfigTest.kt +++ b/kafka-avroserde/test/no/nav/aap/kafka/ConfigTest.kt @@ -2,36 +2,20 @@ package no.nav.aap.kafka import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig -import no.nav.aap.kafka.schemaregistry.SchemaRegistryConfig +import no.nav.aap.kafka.streams.v2.config.SchemaRegistryConfig import no.nav.aap.kafka.streams.v2.config.SslConfig import no.nav.aap.kafka.streams.v2.config.StreamsConfig import org.junit.jupiter.api.Test import kotlin.test.assertEquals -import kotlin.test.assertNull internal class ConfigTest { - @Test - fun `schemaRegistry is empty without schema registry url`() { - val config = StreamsConfig( - applicationId = "app", - brokers = "localhost:9092", - ssl = SslConfig("", "", "") - ) - - config.streamsProperties().apply { - assertNull(this[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG]) - assertNull(this[SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE]) - assertNull(this[SchemaRegistryClientConfig.USER_INFO_CONFIG]) - } - } - @Test fun `schema registry config is configured when present`() { val config = StreamsConfig( applicationId = "app", brokers = "localhost:9092", - schemaRegistry = schemaConfig.properties(), + schemaRegistry = schemaConfig, ssl = SslConfig("", "", "") ) diff --git a/kafka-test-2/test/no/nav/aap/kafka/streams/v2/test/Test.kt b/kafka-test-2/test/no/nav/aap/kafka/streams/v2/test/Test.kt index e43d3c73..82d9d3ba 100644 --- a/kafka-test-2/test/no/nav/aap/kafka/streams/v2/test/Test.kt +++ b/kafka-test-2/test/no/nav/aap/kafka/streams/v2/test/Test.kt @@ -3,6 +3,7 @@ package no.nav.aap.kafka.streams.v2.test import io.micrometer.core.instrument.simple.SimpleMeterRegistry import no.nav.aap.kafka.streams.v2.Table import no.nav.aap.kafka.streams.v2.Topic +import no.nav.aap.kafka.streams.v2.config.SchemaRegistryConfig import no.nav.aap.kafka.streams.v2.config.SslConfig import no.nav.aap.kafka.streams.v2.config.StreamsConfig import no.nav.aap.kafka.streams.v2.serde.StringSerde @@ -28,6 +29,7 @@ internal class Test { applicationId = "app", brokers = "mock://kafka", ssl = SslConfig("", "", ""), + schemaRegistry = SchemaRegistryConfig("", "", ""), ) kafka.connect(topology, config, registry)