Skip to content

Commit

Permalink
WIP: policy middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
jenschude committed Dec 5, 2023
1 parent ad84312 commit 79dcaf8
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@

package com.commercetools.http.reactive;

import java.time.Duration;
import java.util.function.Function;

import io.vrap.rmf.base.client.ApiHttpResponse;
import io.vrap.rmf.base.client.ClientBuilder;
import io.vrap.rmf.base.client.http.*;
import io.vrap.rmf.base.client.http.PolicyMiddleware;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import dev.failsafe.Bulkhead;
import dev.failsafe.Timeout;
import reactor.core.publisher.Mono;
import reactor.util.retry.RetrySpec;

/**
* <h2>PolicyBuilder</h2>
*
* <p>The PolicyBuilder allows the combination of different policies for failing requests.</p>
*
* <p>The order of policies matters. For example applying a {@link #withTimeout(Duration) timeout} before
* {@link #withRetry(io.vrap.rmf.base.client.http.RetryPolicyBuilder)} retry} will time out across all requests whereas applying a timeout after the retry count
* against every single request even the retried ones.</p>
*
* <h3 id="retry">Retry</h3>
*
* <h4>Retrying on HTTP status codes</h4>
*
* {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#retryConfigurationStatusCodes()}
*
* <h3>Retrying specific exceptions</h3>
*
* {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#retryConfigurationExceptions()}
*
* <h3 id="timeout">Timeout</h3>
*
* {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#timeoutConfiguration()}
*
* <h3 id="bulkhead">Bulkhead</h3>
*
* <p>Implementation of a Queue to limit the number of concurrent requests handled by the client</p>
*
* {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#queueConfiguration()}
*/
public class PolicyBuilder {

static final String loggerName = ClientBuilder.COMMERCETOOLS + ".retry";

private static final InternalLogger logger = InternalLogger.getLogger(loggerName);
private static final Logger classLogger = LoggerFactory
.getLogger(io.vrap.rmf.base.client.http.PolicyMiddleware.class);

private final Function<Mono<ApiHttpResponse<byte[]>>, Mono<ApiHttpResponse<byte[]>>> policyFn;

public PolicyBuilder() {
this.policyFn = Function.identity();
}

public PolicyBuilder(Function<Mono<ApiHttpResponse<byte[]>>, Mono<ApiHttpResponse<byte[]>>> fn) {
this.policyFn = fn;
}

public PolicyBuilder withTimeout(final Duration duration) {
return withPolicy((result) -> result.timeout(duration));
}

public PolicyBuilder withRetry(final RetrySpec retrySpec) {
return withPolicy((result) -> result.retryWhen(retrySpec));
}

public PolicyBuilder withPolicy(Function<Mono<ApiHttpResponse<byte[]>>, Mono<ApiHttpResponse<byte[]>>> fn) {

return new PolicyBuilder(fn.andThen(policyFn));
}

public io.vrap.rmf.base.client.http.PolicyMiddleware build() {
return PolicyMiddleware.of(scheduler, policies);
}

public static PolicyBuilder of() {
return new PolicyBuilder();
}

public static Timeout<ApiHttpResponse<byte[]>> timeout(final Duration duration) {
return timeout(duration, options -> options);
}

public static Timeout<ApiHttpResponse<byte[]>> timeout(final Duration duration,
final FailsafeTimeoutBuilderOptions fn) {
return fn.apply(Timeout.builder(duration)).build();
}

public static Bulkhead<ApiHttpResponse<byte[]>> bulkhead(final int maxConcurrency) {
return bulkhead(maxConcurrency, options -> options);
}

public static Bulkhead<ApiHttpResponse<byte[]>> bulkhead(final int maxConcurrency, final Duration maxWaitTime) {
return bulkhead(maxConcurrency, maxWaitTime, options -> options);
}

public static Bulkhead<ApiHttpResponse<byte[]>> bulkhead(final int maxConcurrency,
final FailsafeConcurrencyBuilderOptions fn) {
return fn.apply(Bulkhead.builder(maxConcurrency)).build();
}

public static Bulkhead<ApiHttpResponse<byte[]>> bulkhead(final int maxConcurrency, final Duration maxWaitTime,
final FailsafeConcurrencyBuilderOptions fn) {
return fn.apply(Bulkhead.<ApiHttpResponse<byte[]>> builder(maxConcurrency).withMaxWaitTime(maxWaitTime))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@

public class PolicyMiddlewareImpl implements PolicyMiddleware {

Function<Mono<ApiHttpResponse<byte[]>>, Mono<ApiHttpResponse<byte[]>>> policyFn;

@Override
public Publisher<ApiHttpResponse<byte[]>> invoke(ApiHttpRequest request,
Function<ApiHttpRequest, Publisher<ApiHttpResponse<byte[]>>> next) {
return Mono.from(next.apply(request));

return policyFn.apply(Mono.from(next.apply(request)));
}
}

0 comments on commit 79dcaf8

Please sign in to comment.