diff --git a/common_spec_types.go b/common_spec_types.go index 9bada0f..adc0a82 100644 --- a/common_spec_types.go +++ b/common_spec_types.go @@ -262,10 +262,27 @@ type IOConfig struct { IdleConfig *IdleConfig `json:"idleConfig,omitempty"` } +// EnvironmentVariableDynamicConfigProvider provides configuration values via environment variables. +type EnvironmentVariableDynamicConfigProvider struct { + Type string `json:"type"` + Variables map[string]string `json:"variables"` +} + +// MapStringDynamicConfigProvider passes config values as a map. +type MapStringDynamicConfigProvider struct { + Config map[string]string `json:"config"` +} + +// DynamicConfigProvider is an umbrella type for different DynamicConfigProviders. +type DynamicConfigProvider struct { + Value any +} + // ConsumerProperties is a set of properties that is passed to a specific // consumer, i.e. Kafka consumer. type ConsumerProperties struct { - BootstrapServers string `json:"bootstrap.servers,omitempty"` + BootstrapServers string `json:"bootstrap.servers,omitempty"` + DruidDynamicConfigProvider *DynamicConfigProvider `json:"druid.dynamic.config.provider,omitempty"` } // InputFormat specifies kafka messages format type and describes any conversions applied to @@ -385,3 +402,21 @@ func (g *DimensionSpec) UnmarshalJSON(b []byte) error { func (g *DimensionSpec) MarshalJSON() ([]byte, error) { return json.Marshal(&g.Value) } + +func (p *DynamicConfigProvider) UnmarshalJSON(b []byte) error { + var evcp EnvironmentVariableDynamicConfigProvider + if err := json.Unmarshal(b, &evcp); err == nil && evcp.Type == "environment" { + p.Value = evcp + return nil + } + var mcp MapStringDynamicConfigProvider + if err := json.Unmarshal(b, &mcp); err == nil { + p.Value = mcp + return nil + } + return fmt.Errorf("unsupported dynamic config provider: %s", b) +} + +func (p *DynamicConfigProvider) MarshalJSON() ([]byte, error) { + return json.Marshal(&p.Value) +} diff --git a/supervisor_types.go b/supervisor_types.go index 5ef2b83..bbd03a5 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -230,6 +230,18 @@ func SetGranularitySpec(segmentGranularity string, queryGranularity *QueryGranul } } +func SetEnvironmentDynamicConfigProvider(dynamicConfig EnvironmentVariableDynamicConfigProvider) IngestionSpecOptions { + return func(spec *InputIngestionSpec) { + spec.IOConfig.ConsumerProperties.DruidDynamicConfigProvider = &DynamicConfigProvider{dynamicConfig} + } +} + +func SetMapStringDynamicConfigProvider(dynamicConfig MapStringDynamicConfigProvider) IngestionSpecOptions { + return func(spec *InputIngestionSpec) { + spec.IOConfig.ConsumerProperties.DruidDynamicConfigProvider = &DynamicConfigProvider{dynamicConfig} + } +} + // SetSQLInputSource configures sql input source. func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions { return func(spec *InputIngestionSpec) { diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 84dcd28..49b3152 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -87,7 +87,16 @@ var jsonBasic = `{ "ioConfig": { "topic": "test_topic", "consumerProperties": { - "bootstrap.servers": "test_brokers" + "bootstrap.servers": "test_brokers", + "druid.dynamic.config.provider": { + "type":"environment", + "variables": { + "sasl.jaas.config":"KAFKA_JAAS_CONFIG", + "ssl.key.password":"SSL_KEY_PASSWORD", + "ssl.keystore.password":"SSL_KEYSTORE_PASSWORD", + "ssl.truststore.password":"SSL_TRUSTSTORE_PASSWORD" + } + } }, "taskDuration": "PT1H", "useEarliestOffset": false, @@ -101,6 +110,15 @@ var jsonBasic = `{ }` func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { + configProvider := EnvironmentVariableDynamicConfigProvider{ + Type: "environment", + Variables: map[string]string{ + "sasl.jaas.config": "KAFKA_JAAS_CONFIG", + "ssl.key.password": "SSL_KEY_PASSWORD", + "ssl.keystore.password": "SSL_KEYSTORE_PASSWORD", + "ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD", + }, + } spec := NewIngestionSpec( SetDataSource("test_datasource"), SetTopic("test_topic"), @@ -110,6 +128,7 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { {"user_name"}, {"payload"}, }), + SetEnvironmentDynamicConfigProvider(configProvider), ) actual, err := json.Marshal(spec) if err != nil { @@ -126,6 +145,90 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { require.Equal(t, spec, checkSpec) } +var jsonWithMapStringConfigProvider = `{ + "type": "kafka", + "dataSchema": { + "dataSource": "test_datasource", + "timestampSpec": { + "column": "ts", + "format": "auto" + }, + "transformSpec": { + "transforms": [] + }, + "dimensionsSpec": { + "dimensions": [ + "ts", + "user_name", + "payload" + ] + }, + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "none", + "rollup": false + } + }, + "ioConfig": { + "topic": "test_topic", + "consumerProperties": { + "bootstrap.servers": "test_brokers", + "druid.dynamic.config.provider": { + "config": { + "sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username='admin_user' password='admin_password';", + "ssl.key.password":"ssl_key_password", + "ssl.keystore.password":"ssl_keystore_password", + "ssl.truststore.password":"ssl_truststore_password" + } + } + }, + "taskDuration": "PT1H", + "useEarliestOffset": false, + "flattenSpec": { + "fields": [] + }, + "inputFormat": { + "type": "json" + } + } +}` + +func TestKafkaIngestionSpecWithMapStringConfigProvider_MarshalJSON(t *testing.T) { + configProvider := MapStringDynamicConfigProvider{ + Config: map[string]string{ + "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin_user' password='admin_password';", + "ssl.key.password": "ssl_key_password", + "ssl.keystore.password": "ssl_keystore_password", + "ssl.truststore.password": "ssl_truststore_password", + }, + } + spec := NewIngestionSpec( + SetDataSource("test_datasource"), + SetTopic("test_topic"), + SetBrokers("test_brokers"), + SetDimensions(DimensionSet{ + {"ts"}, + {"user_name"}, + {"payload"}, + }), + SetMapStringDynamicConfigProvider(configProvider), + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonWithMapStringConfigProvider) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) + + var checkSpec *InputIngestionSpec + err = json.Unmarshal(actual, &checkSpec) + if err != nil { + t.Fatalf("unexpected error while unmarshalling: %v", err) + } + require.Equal(t, spec, checkSpec) +} + var jsonWithTypedDimensions = `{ "type": "kafka", "dataSchema": {