diff --git a/gradle.properties b/gradle.properties index 78503c3..8086c18 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.7.0-SNAPSHOT +version=0.7.1-SNAPSHOT diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java index 8cd2657..0a3e782 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java @@ -50,6 +50,7 @@ public class HttpSinkConfig extends AbstractConfig { private static final String OAUTH2_CLIENT_ID_CONFIG = "oauth2.client.id"; private static final String OAUTH2_CLIENT_SECRET_CONFIG = "oauth2.client.secret"; private static final String OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG = "oauth2.client.authorization.mode"; + private static final String OAUTH2_CLIENT_AUDIENCE_CONFIG = "oauth2.client.audience"; private static final String OAUTH2_CLIENT_SCOPE_CONFIG = "oauth2.client.scope"; private static final String OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG = "oauth2.response.token.property"; @@ -199,8 +200,8 @@ public boolean visible(final String name, final Map parsedConfig ConfigDef.Width.LONG, OAUTH2_ACCESS_TOKEN_URL_CONFIG, List.of(OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG, - OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG, - OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG) + OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_AUDIENCE_CONFIG, + OAUTH2_CLIENT_SCOPE_CONFIG, OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG) ); configDef.define( OAUTH2_CLIENT_ID_CONFIG, @@ -219,7 +220,7 @@ public String toString() { ConfigDef.Width.LONG, OAUTH2_CLIENT_ID_CONFIG, List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG, - OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, + OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_AUDIENCE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG, OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG) ); configDef.define( @@ -233,7 +234,7 @@ public String toString() { ConfigDef.Width.LONG, OAUTH2_CLIENT_SECRET_CONFIG, List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, - OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, + OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_AUDIENCE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG, OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG) ); configDef.define( @@ -273,7 +274,28 @@ public String toString() { ConfigDef.Width.LONG, OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG, - OAUTH2_CLIENT_SCOPE_CONFIG, OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG) + OAUTH2_CLIENT_AUDIENCE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG, + OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG) + ); + configDef.define( + OAUTH2_CLIENT_AUDIENCE_CONFIG, + ConfigDef.Type.STRING, + null, + new ConfigDef.NonEmptyStringWithoutControlChars() { + @Override + public String toString() { + return "OAuth2 client audience"; + } + }, + ConfigDef.Importance.LOW, + "The audience used for fetching an access token.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.LONG, + OAUTH2_CLIENT_AUDIENCE_CONFIG, + List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, + OAUTH2_CLIENT_SECRET_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG, + OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG) ); configDef.define( OAUTH2_CLIENT_SCOPE_CONFIG, @@ -291,7 +313,8 @@ public String toString() { groupCounter++, ConfigDef.Width.LONG, OAUTH2_CLIENT_SCOPE_CONFIG, - List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG, + List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, + OAUTH2_CLIENT_SECRET_CONFIG, OAUTH2_CLIENT_AUDIENCE_CONFIG, OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG) ); configDef.define( @@ -312,7 +335,8 @@ public String toString() { ConfigDef.Width.LONG, OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG, List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG, - OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG) + OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_AUDIENCE_CONFIG, + OAUTH2_CLIENT_SCOPE_CONFIG) ); } @@ -643,6 +667,10 @@ public final OAuth2AuthorizationMode oauth2AuthorizationMode() { return OAuth2AuthorizationMode.valueOf(getString(OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG).toUpperCase()); } + public final String oauth2ClientAudience() { + return getString(OAUTH2_CLIENT_AUDIENCE_CONFIG); + } + public final String oauth2ClientScope() { return getString(OAUTH2_CLIENT_SCOPE_CONFIG); } diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilder.java b/src/main/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilder.java index 2d080c9..82823d8 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilder.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilder.java @@ -47,6 +47,10 @@ public HttpRequest.Builder build(final HttpSinkConfig config) { if (config.oauth2ClientScope() != null) { accessTokenRequestBodyBuilder.add(encodeNameAndValue("scope", config.oauth2ClientScope())); } + if (config.oauth2ClientAudience() != null) { + accessTokenRequestBodyBuilder.add(encodeNameAndValue("audience", config + .oauth2ClientAudience())); + } setClientIdAndSecret(accessTokenRequestBuilder, accessTokenRequestBodyBuilder, config); return accessTokenRequestBuilder diff --git a/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java b/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java index 68df2b3..42308c1 100644 --- a/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java @@ -71,6 +71,7 @@ void correctMinimalConfig() throws URISyntaxException { .returns(null, from(HttpSinkConfig::oauth2AccessTokenUri)) .returns(null, from(HttpSinkConfig::oauth2ClientId)) .returns(null, from(HttpSinkConfig::oauth2ClientSecret)) + .returns(null, from(HttpSinkConfig::oauth2ClientAudience)) .returns(null, from(HttpSinkConfig::oauth2ClientScope)) .returns(OAuth2AuthorizationMode.HEADER, from(HttpSinkConfig::oauth2AuthorizationMode)) .returns("access_token", from(HttpSinkConfig::oauth2ResponseTokenProperty)) @@ -176,6 +177,7 @@ void validOAuth2MinimalConfiguration() throws URISyntaxException { .returns(new URI("http://localhost:8090/token"), from(HttpSinkConfig::oauth2AccessTokenUri)) .returns("client_id", from(HttpSinkConfig::oauth2ClientId)) .returns("client_secret", from(httpSinkConfig -> httpSinkConfig.oauth2ClientSecret().value())) + .returns(null, from(HttpSinkConfig::oauth2ClientAudience)) .returns(null, from(HttpSinkConfig::oauth2ClientScope)) .returns(OAuth2AuthorizationMode.HEADER, from(HttpSinkConfig::oauth2AuthorizationMode)) .returns("access_token", from(HttpSinkConfig::oauth2ResponseTokenProperty)); @@ -191,6 +193,7 @@ void validOAuth2FullConfiguration() throws URISyntaxException { "oauth2.client.id", "client_id", "oauth2.client.secret", "client_secret", "oauth2.client.authorization.mode", "url", + "oauth2.client.audience", "http://localhost:8000/api", "oauth2.client.scope", "scope1,scope2", "oauth2.response.token.property", "moooooo" ); @@ -201,6 +204,7 @@ void validOAuth2FullConfiguration() throws URISyntaxException { .returns(new URI("http://localhost:8090/token"), from(HttpSinkConfig::oauth2AccessTokenUri)) .returns("client_id", from(HttpSinkConfig::oauth2ClientId)) .returns("client_secret", from(httpSinkConfig -> httpSinkConfig.oauth2ClientSecret().value())) + .returns("http://localhost:8000/api", from(HttpSinkConfig::oauth2ClientAudience)) .returns("scope1,scope2", from(HttpSinkConfig::oauth2ClientScope)) .returns(OAuth2AuthorizationMode.URL, from(HttpSinkConfig::oauth2AuthorizationMode)) .returns("moooooo", from(HttpSinkConfig::oauth2ResponseTokenProperty));