Skip to content

Commit

Permalink
Add dynamic config providers for IOConfig ConsumerProperties (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts authored Feb 27, 2024
1 parent c19c9dc commit 1bb0da8
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 2 deletions.
37 changes: 36 additions & 1 deletion common_spec_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,27 @@ type IOConfig struct {
IdleConfig *IdleConfig `json:"idleConfig,omitempty"`
}

// EnvironmentVariableDynamicConfigProvider provides configuration values via environment variables.
type EnvironmentVariableDynamicConfigProvider struct {
Type string `json:"type"`
Variables map[string]string `json:"variables"`
}

// MapStringDynamicConfigProvider passes config values as a <key, value> map.
type MapStringDynamicConfigProvider struct {
Config map[string]string `json:"config"`
}

// DynamicConfigProvider is an umbrella type for different DynamicConfigProviders.
type DynamicConfigProvider struct {
Value any
}

// ConsumerProperties is a set of properties that is passed to a specific
// consumer, i.e. Kafka consumer.
type ConsumerProperties struct {
BootstrapServers string `json:"bootstrap.servers,omitempty"`
BootstrapServers string `json:"bootstrap.servers,omitempty"`
DruidDynamicConfigProvider *DynamicConfigProvider `json:"druid.dynamic.config.provider,omitempty"`
}

// InputFormat specifies kafka messages format type and describes any conversions applied to
Expand Down Expand Up @@ -385,3 +402,21 @@ func (g *DimensionSpec) UnmarshalJSON(b []byte) error {
func (g *DimensionSpec) MarshalJSON() ([]byte, error) {
return json.Marshal(&g.Value)
}

func (p *DynamicConfigProvider) UnmarshalJSON(b []byte) error {
var evcp EnvironmentVariableDynamicConfigProvider
if err := json.Unmarshal(b, &evcp); err == nil && evcp.Type == "environment" {
p.Value = evcp
return nil
}
var mcp MapStringDynamicConfigProvider
if err := json.Unmarshal(b, &mcp); err == nil {
p.Value = mcp
return nil
}
return fmt.Errorf("unsupported dynamic config provider: %s", b)
}

func (p *DynamicConfigProvider) MarshalJSON() ([]byte, error) {
return json.Marshal(&p.Value)
}
12 changes: 12 additions & 0 deletions supervisor_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,18 @@ func SetGranularitySpec(segmentGranularity string, queryGranularity *QueryGranul
}
}

func SetEnvironmentDynamicConfigProvider(dynamicConfig EnvironmentVariableDynamicConfigProvider) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.IOConfig.ConsumerProperties.DruidDynamicConfigProvider = &DynamicConfigProvider{dynamicConfig}
}
}

func SetMapStringDynamicConfigProvider(dynamicConfig MapStringDynamicConfigProvider) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.IOConfig.ConsumerProperties.DruidDynamicConfigProvider = &DynamicConfigProvider{dynamicConfig}
}
}

// SetSQLInputSource configures sql input source.
func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
Expand Down
105 changes: 104 additions & 1 deletion supervisor_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,16 @@ var jsonBasic = `{
"ioConfig": {
"topic": "test_topic",
"consumerProperties": {
"bootstrap.servers": "test_brokers"
"bootstrap.servers": "test_brokers",
"druid.dynamic.config.provider": {
"type":"environment",
"variables": {
"sasl.jaas.config":"KAFKA_JAAS_CONFIG",
"ssl.key.password":"SSL_KEY_PASSWORD",
"ssl.keystore.password":"SSL_KEYSTORE_PASSWORD",
"ssl.truststore.password":"SSL_TRUSTSTORE_PASSWORD"
}
}
},
"taskDuration": "PT1H",
"useEarliestOffset": false,
Expand All @@ -101,6 +110,15 @@ var jsonBasic = `{
}`

func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) {
configProvider := EnvironmentVariableDynamicConfigProvider{
Type: "environment",
Variables: map[string]string{
"sasl.jaas.config": "KAFKA_JAAS_CONFIG",
"ssl.key.password": "SSL_KEY_PASSWORD",
"ssl.keystore.password": "SSL_KEYSTORE_PASSWORD",
"ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD",
},
}
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
Expand All @@ -110,6 +128,7 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) {
{"user_name"},
{"payload"},
}),
SetEnvironmentDynamicConfigProvider(configProvider),
)
actual, err := json.Marshal(spec)
if err != nil {
Expand All @@ -126,6 +145,90 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) {
require.Equal(t, spec, checkSpec)
}

var jsonWithMapStringConfigProvider = `{
"type": "kafka",
"dataSchema": {
"dataSource": "test_datasource",
"timestampSpec": {
"column": "ts",
"format": "auto"
},
"transformSpec": {
"transforms": []
},
"dimensionsSpec": {
"dimensions": [
"ts",
"user_name",
"payload"
]
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "none",
"rollup": false
}
},
"ioConfig": {
"topic": "test_topic",
"consumerProperties": {
"bootstrap.servers": "test_brokers",
"druid.dynamic.config.provider": {
"config": {
"sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username='admin_user' password='admin_password';",
"ssl.key.password":"ssl_key_password",
"ssl.keystore.password":"ssl_keystore_password",
"ssl.truststore.password":"ssl_truststore_password"
}
}
},
"taskDuration": "PT1H",
"useEarliestOffset": false,
"flattenSpec": {
"fields": []
},
"inputFormat": {
"type": "json"
}
}
}`

func TestKafkaIngestionSpecWithMapStringConfigProvider_MarshalJSON(t *testing.T) {
configProvider := MapStringDynamicConfigProvider{
Config: map[string]string{
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin_user' password='admin_password';",
"ssl.key.password": "ssl_key_password",
"ssl.keystore.password": "ssl_keystore_password",
"ssl.truststore.password": "ssl_truststore_password",
},
}
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions(DimensionSet{
{"ts"},
{"user_name"},
{"payload"},
}),
SetMapStringDynamicConfigProvider(configProvider),
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithMapStringConfigProvider)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
}

var jsonWithTypedDimensions = `{
"type": "kafka",
"dataSchema": {
Expand Down

0 comments on commit 1bb0da8

Please sign in to comment.