Skip to content

Commit

Permalink
Modified kubernetes openapi http client filter to support reactive an…
Browse files Browse the repository at this point in the history
…d blocking token loaders (#850)
  • Loading branch information
msupic authored Feb 19, 2025
1 parent b40c35b commit ef46344
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 19 deletions.
1 change: 1 addition & 0 deletions config/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
<!-- files="DefaultBeanContext.java|BeanDefinitionWriter.java|DefaultHttpClient.java"/> -->

<suppress checks="MissingJavadocType" files=".*doc-examples.*" />
<suppress checks="." files="generated[\\/]openapi[\\/]" />
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.micronaut.context.annotation.BootstrapContextCompatible;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MutableHttpRequest;
Expand All @@ -31,14 +32,21 @@
import io.micronaut.kubernetes.client.openapi.config.KubernetesClientConfiguration;
import io.micronaut.kubernetes.client.openapi.config.model.AuthInfo;
import io.micronaut.kubernetes.client.openapi.credential.KubernetesTokenLoader;
import io.micronaut.kubernetes.client.openapi.credential.ReactiveKubernetesTokenLoader;
import io.micronaut.kubernetes.client.openapi.credential.TokenLoader;
import io.micronaut.scheduling.TaskExecutors;
import jakarta.inject.Named;
import jakarta.inject.Provider;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.util.Collection;
import java.util.concurrent.ExecutorService;

/**
* Filter which sets the authorization request header with basic or bearer token
Expand All @@ -52,16 +60,19 @@ final class KubernetesHttpClientFilter implements HttpClientFilter {
private static final Logger LOG = LoggerFactory.getLogger(KubernetesHttpClientFilter.class);

private final Provider<KubeConfig> kubeConfigProvider;
private final Provider<Collection<KubernetesTokenLoader>> kubernetesTokenLoaders;
private final Provider<Collection<TokenLoader>> tokenLoaders;
private final Scheduler scheduler;

KubernetesHttpClientFilter(Provider<KubeConfigLoader> kubeConfigLoader,
ApplicationContext applicationContext) {
ApplicationContext applicationContext,
@Named(TaskExecutors.BLOCKING) @Nullable ExecutorService executorService) {
// Retrieval has to be delegated to filtering, as any of these classes might
// depend on a client causing a circular dependency.
this.kubeConfigProvider = ProviderUtils.memoized(
() -> kubeConfigLoader.get().getKubeConfig());
this.kubernetesTokenLoaders = ProviderUtils.memoized(
() -> applicationContext.getBeansOfType(KubernetesTokenLoader.class));
this.tokenLoaders = ProviderUtils.memoized(
() -> applicationContext.getBeansOfType(TokenLoader.class));
this.scheduler = executorService == null ? null : Schedulers.fromExecutorService(executorService);
}

@Override
Expand All @@ -78,10 +89,10 @@ public Publisher<? extends HttpResponse<?>> doFilter(MutableHttpRequest<?> reque
return chain.proceed(request.basicAuth(user.username(), user.password()));
}
}
Collection<KubernetesTokenLoader> loaders = kubernetesTokenLoaders.get();
Collection<TokenLoader> loaders = tokenLoaders.get();
LOG.trace("Using token authentication, tokenLoaders={}", loaders);
return Flux.fromIterable(loaders)
.concatMap(KubernetesTokenLoader::getToken)
.concatMap(this::getToken)
.next()
.switchIfEmpty(Mono.just(StringUtils.EMPTY_STRING))
.doOnNext(token -> {
Expand All @@ -91,4 +102,18 @@ public Publisher<? extends HttpResponse<?>> doFilter(MutableHttpRequest<?> reque
})
.flatMapMany(token -> StringUtils.isEmpty(token) ? chain.proceed(request) : chain.proceed(request.bearerAuth(token)));
}

private Publisher<String> getToken(TokenLoader tokenLoader) {
if (tokenLoader instanceof ReactiveKubernetesTokenLoader reactiveTokenLoader) {
return reactiveTokenLoader.getToken();
} else if (tokenLoader instanceof KubernetesTokenLoader blockingTokenLoader) {
Mono<String> publisher = Mono.fromCallable(blockingTokenLoader::getToken);
if (scheduler != null) {
publisher = publisher.subscribeOn(scheduler);
}
return publisher.doOnNext(token -> LOG.trace("Token loaded by {}", blockingTokenLoader.getClass().getName()));
}
LOG.error("Found unknown token loader implementation: {}", tokenLoader.getClass().getName());
return Mono.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
@Singleton
@BootstrapContextCompatible
@Internal
final class ExecCommandCredentialLoader implements KubernetesTokenLoader {
final class ExecCommandCredentialLoader implements ReactiveKubernetesTokenLoader {
private static final Logger LOG = LoggerFactory.getLogger(ExecCommandCredentialLoader.class);

private static final int ORDER = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@Singleton
@BootstrapContextCompatible
@Internal
final class KubeConfigTokenLoader implements KubernetesTokenLoader {
final class KubeConfigTokenLoader implements ReactiveKubernetesTokenLoader {
private static final Logger LOG = LoggerFactory.getLogger(KubeConfigTokenLoader.class);

private static final int ORDER = 20;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@
*/
package io.micronaut.kubernetes.client.openapi.credential;

import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.order.Ordered;
import org.reactivestreams.Publisher;
import io.micronaut.core.annotation.Nullable;

/**
* The loader for bearer token used in kubernetes api service authentication.
*/
public interface KubernetesTokenLoader extends Ordered {
public interface KubernetesTokenLoader extends TokenLoader {

/**
* Gets a bearer token for request authentication.
*
* @return bearer token
*/
@NonNull Publisher<String> getToken();
@Nullable String getToken();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2017-2025 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.kubernetes.client.openapi.credential;

import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.async.annotation.SingleResult;
import org.reactivestreams.Publisher;

/**
* The loader for bearer token used in kubernetes api service authentication.
*/
public interface ReactiveKubernetesTokenLoader extends TokenLoader {

/**
* Gets a bearer token for request authentication.
*
* @return bearer token
*/
@SingleResult
@NonNull Publisher<String> getToken();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
@BootstrapContextCompatible
@Requires(env = Environment.KUBERNETES)
@Requires(property = KubernetesClientConfiguration.PREFIX + ".service-account.enabled", value = StringUtils.TRUE, defaultValue = StringUtils.TRUE)
final class ServiceAccountTokenLoader implements KubernetesTokenLoader {
final class ServiceAccountTokenLoader implements ReactiveKubernetesTokenLoader {
private static final Logger LOG = LoggerFactory.getLogger(ServiceAccountTokenLoader.class);

private static final int ORDER = 30;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2017-2025 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.kubernetes.client.openapi.credential;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.order.Ordered;

/**
* Interface for blocking and reactive kubernetes token loaders.
*/
@Internal
public interface TokenLoader extends Ordered {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package io.micronaut.kubernetes.client.openapi

import io.micronaut.context.ApplicationContext
import io.micronaut.context.ProviderUtils
import io.micronaut.core.util.StringUtils
import io.micronaut.http.HttpMethod
import io.micronaut.http.HttpResponse
import io.micronaut.http.MutableHttpRequest
import io.micronaut.http.filter.ClientFilterChain
import io.micronaut.http.simple.SimpleHttpRequest
import io.micronaut.kubernetes.client.openapi.config.KubeConfig
import io.micronaut.kubernetes.client.openapi.config.KubeConfigLoader
import io.micronaut.kubernetes.client.openapi.credential.KubernetesTokenLoader
import io.micronaut.kubernetes.client.openapi.credential.ReactiveKubernetesTokenLoader
import io.micronaut.kubernetes.client.openapi.credential.TokenLoader
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import spock.lang.Specification

class KubernetesHttpClientFilterSpec extends Specification {

private static final def BASE_MAP = ["current-context": "test-context"]
private static final def CONTEXT_MAP = [contexts: [[name: "test-context", context: [cluster: "test-cluster", user: "test-user"]]]]
private static final def CLUSTER_MAP = [clusters: [[name: "test-cluster", cluster: ["server": "test-server"]]]]
private static final def USER_MAP = [users: [[name: "test-user", user: ["username": "test-username", "password": "test-password"]]]]
private static final def KUBE_CONFIG_MAP = BASE_MAP + CONTEXT_MAP + CLUSTER_MAP + USER_MAP

ApplicationContext applicationContext
KubeConfigLoader kubeConfigLoader

def setup() {
applicationContext = Stub(ApplicationContext)
kubeConfigLoader = Stub(KubeConfigLoader)
}

def 'filter uses username and password authentication'() {
given:
def filter = new KubernetesHttpClientFilter(ProviderUtils.memoized(() -> kubeConfigLoader), applicationContext, null)
def request = new SimpleHttpRequest<String>(HttpMethod.GET, "/test", "value")
def filterChain = new CustomClientFilterChain()

and:
def config = new KubeConfig(KUBE_CONFIG_MAP)
kubeConfigLoader.getKubeConfig() >> config

when:
Mono.from(filter.doFilter(request, filterChain)).block()

then:
filterChain.getAuthHeaderValue() == "Basic " + new String(Base64.getEncoder().encode("test-username:test-password".getBytes()))
}

def 'filter uses token from blocking loader'() {
given:
def filter = new KubernetesHttpClientFilter(ProviderUtils.memoized(() -> kubeConfigLoader), applicationContext, null)
def request = new SimpleHttpRequest<String>(HttpMethod.GET, "/test", "value")
def filterChain = new CustomClientFilterChain()

and:
kubeConfigLoader.getKubeConfig() >> null

and:
def loader1 = new BlockingLoader("test1")
def loader2 = new ReactiveLoader("test2")
applicationContext.getBeansOfType(TokenLoader.class) >> [loader1, loader2]

when:
Mono.from(filter.doFilter(request, filterChain)).block()

then:
filterChain.getAuthHeaderValue() == "Bearer test1"
}

def 'filter uses token from reactive loader'() {
given:
def filter = new KubernetesHttpClientFilter(ProviderUtils.memoized(() -> kubeConfigLoader), applicationContext, null)
def request = new SimpleHttpRequest<String>(HttpMethod.GET, "/test", "value")
def filterChain = new CustomClientFilterChain()

and:
kubeConfigLoader.getKubeConfig() >> null

and:
def loader1 = new BlockingLoader(null)
def loader2 = new ReactiveLoader("test2")
applicationContext.getBeansOfType(TokenLoader.class) >> [loader1, loader2]

when:
Mono.from(filter.doFilter(request, filterChain)).block()

then:
filterChain.getAuthHeaderValue() == "Bearer test2"
}

class CustomClientFilterChain implements ClientFilterChain {
private String authHeaderValue

String getAuthHeaderValue() {
return authHeaderValue
}

@Override
Publisher<? extends HttpResponse<?>> proceed(MutableHttpRequest<?> request) {
authHeaderValue = request.getHeaders().getAuthorization().orElse(StringUtils.EMPTY_STRING)
return Mono.empty()
}
}

static class BlockingLoader implements KubernetesTokenLoader {
private final String token

private BlockingLoader(String token) {
this.token = token
}

@Override
String getToken() {
return token
}
}

static class ReactiveLoader implements ReactiveKubernetesTokenLoader {
private final String token

private ReactiveLoader(String token) {
this.token = token
}

@Override
Publisher<String> getToken() {
return Mono.fromCallable(() -> loadToken())
}

private String loadToken() {
return token
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Header
import io.micronaut.kubernetes.client.openapi.api.CoreV1Api
import io.micronaut.kubernetes.client.openapi.credential.KubernetesTokenLoader
import io.micronaut.kubernetes.client.openapi.credential.ReactiveKubernetesTokenLoader
import io.micronaut.kubernetes.client.openapi.model.V1Pod
import io.micronaut.kubernetes.client.openapi.model.V1PodList
import io.micronaut.runtime.server.EmbeddedServer
Expand Down Expand Up @@ -96,7 +96,7 @@ current-context: test-context
@Singleton
@Requires(property = 'spec.name', value = 'ClientCredentialLoaderSpec-Client')
@BootstrapContextCompatible
static class FirstCredentialLoader implements KubernetesTokenLoader {
static class FirstCredentialLoader implements ReactiveKubernetesTokenLoader {

@Override
Publisher<String> getToken() {
Expand All @@ -112,7 +112,7 @@ current-context: test-context
@Singleton
@Requires(property = 'spec.name', value = 'ClientCredentialLoaderSpec-Client')
@BootstrapContextCompatible
static class SecondCredentialLoader implements KubernetesTokenLoader {
static class SecondCredentialLoader implements ReactiveKubernetesTokenLoader {

@Override
Publisher<String> getToken() {
Expand Down
5 changes: 3 additions & 2 deletions src/main/docs/guide/kubernetes-client-openapi.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Then you can simply use Micronaut injection to get configured apis object from p

snippet::micronaut.client.PodController[project-base="examples/example-kubernetes-client-openapi-reactor", source="main"]

.Configuration
### Configuration

[configuration]
----
Expand Down Expand Up @@ -77,7 +77,8 @@ The client supports the following authentication strategies:
There are several interfaces that can be implemented to change default implementations:

* link:{api}/io/micronaut/kubernetes/client/openapi/config/KubeConfigLoader.html[KubeConfigLoader] - a custom implementation can be used when a kube config file needs to be loaded from a cloud service. There is also an option of extending link:{api}/io/micronaut/kubernetes/client/openapi/config/AbstractKubeConfigLoader.html[AbstractKubeConfigLoader] which caches the loaded kube config data and provides a few helper methods.
* link:{api}/io/micronaut/kubernetes/client/openapi/credential/KubernetesTokenLoader.html[KubernetesTokenLoader] - a custom implementation can be used for loading a bearer token. link:{api}/io/micronaut/kubernetes/client/openapi/KubernetesHttpClientFilter.html[KubernetesHttpClientFilter] iterates through a list of implementations of this interface and creates the Authorization header using the token from the first implementation which returns it. The following implementations are currently used:
* link:{api}/io/micronaut/kubernetes/client/openapi/credential/KubernetesTokenLoader.html[KubernetesTokenLoader] - a custom blocking implementation can be used for loading a bearer token. link:{api}/io/micronaut/kubernetes/client/openapi/KubernetesHttpClientFilter.html[KubernetesHttpClientFilter] iterates through a list of blocking and reactive implementations and creates the Authorization header using the token from the first implementation which returns it
* link:{api}/io/micronaut/kubernetes/client/openapi/credential/ReactiveKubernetesTokenLoader.html[ReactiveKubernetesTokenLoader] - a custom reactive implementation can be used for loading a bearer token. The client is using the following implementations by default:
** link:{api}/io/micronaut/kubernetes/client/openapi/credential/ExecCommandCredentialLoader.html[ExecCommandCredentialLoader] - implementation which executes the command from the kube config file to get the token.
** link:{api}/io/micronaut/kubernetes/client/openapi/credential/KubeConfigTokenLoader.html[KubeConfigTokenLoader] - implementation which uses the token from the kube config file.
** link:{api}/io/micronaut/kubernetes/client/openapi/credential/ServiceAccountTokenLoader.html[ServiceAccountTokenLoader] - implementation which uses the service account token.
Expand Down

0 comments on commit ef46344

Please sign in to comment.