Skip to content

Commit 629f3b0

Browse files
authored
Add LoadBalancer for generalizing EndpointSelector (#5779)
Motivation: A load-balancing strategy such as round robin can be used in `EndpointSelector` and elsewhere. For example, in the event loop scheduler, requests can be distributed using round robin to determine which event loop to use. This PR is preliminary work to resolve #5289 and #5537 Modifications: - `LoadBalancer<T, C>` is the root interface all load balancers should implement. - `T` is the type of a candidate selected by strategies. - `C` is the type of context that is used when selecting a candidate. - `UpdatableLoadBalancer<T, C>` is a stateful load balancer to which new endpoints are updated. `RampingUpLoadBalancer` is the only implementation for `UpdatableLoadBalancer`. Other load balances will be re-created when new endpoints are added because they can always be reconstructed for the same results. - `Weighted` is a new API that represents the weight of an object. - If an object is `Weighted`, a weight function is not necessary when creating weighted-based load balancers. - `Endpoint` now implements `Weighted`. - `EndpointSelectionStategy` uses `DefaultEndpointSelector` to create a `LoadBalancer<Endpoint, ClientRequestContext>` internally and delegates the selection logic to it. - Each `EndpointSelectionStategy` implements `LoadBalancerFactory` to update the existing `LoadBalancer` or create a new `LoadBalancer` when endpoints are updated. - The following implementations are migrated from `**Strategy`. Except for `RampingUpLoadBalancer` which has some minor changes, most of the logic was ported as is. - `RampingUpLoadBalancer` - `Weight` prefix is dropped for simplicity. There may be no problem conveying the behavior. - Refactored to use a lock to guarantee thread-safety and sequential access. - A `RampingUpLoadBalancer` is now created from a list of candidates. If an executor is used to build the initial state, null is returned right after it is created. - `AbstractRampingUpLoadBalancerBuilder` is added to share common code for `RampingUpLoadBalancerBuilder` and `WeightRampingUpStrategyBuilder` - Fixed xDS implementations to use the new API when implementing load balancing strategies. - Deprecation) `EndpointWeightTransition` in favor of `WeightTransition` Result: - You can now create `LoadBalancer` using various load balancing strategies to select an element from a list of candidates. ```java List<Endpoint> candidates = ...; LoadBalancer.ofRoundRobin(candidates); LoadBalancer.ofWeightedRoundRobin(candidates); LoadBalancer.ofSticky(candidates, contextHasher); LoadBalancer.ofWeightedRandom(candidates); LoadBalancer.ofRampingUp(candidates); ```
1 parent 98e3054 commit 629f3b0

40 files changed

+2705
-1100
lines changed

core/src/main/java/com/linecorp/armeria/client/Endpoint.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.linecorp.armeria.common.SessionProtocol;
5656
import com.linecorp.armeria.common.annotation.Nullable;
5757
import com.linecorp.armeria.common.annotation.UnstableApi;
58+
import com.linecorp.armeria.common.loadbalancer.Weighted;
5859
import com.linecorp.armeria.common.util.DomainSocketAddress;
5960
import com.linecorp.armeria.common.util.UnmodifiableFuture;
6061
import com.linecorp.armeria.internal.common.ArmeriaHttpUtil;
@@ -72,7 +73,7 @@
7273
* represented as {@code "<host>"} or {@code "<host>:<port>"} in the authority part of a URI. It can have
7374
* an IP address if the host name has been resolved and thus there's no need to query a DNS server.</p>
7475
*/
75-
public final class Endpoint implements Comparable<Endpoint>, EndpointGroup {
76+
public final class Endpoint implements Comparable<Endpoint>, EndpointGroup, Weighted {
7677

7778
private static final Comparator<Endpoint> COMPARATOR =
7879
Comparator.comparing(Endpoint::host)
@@ -652,6 +653,7 @@ public Endpoint withWeight(int weight) {
652653
/**
653654
* Returns the weight of this endpoint.
654655
*/
656+
@Override
655657
public int weight() {
656658
return weight;
657659
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2024 LINE Corporation
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.linecorp.armeria.client.endpoint;
18+
19+
import java.util.List;
20+
21+
import com.linecorp.armeria.client.ClientRequestContext;
22+
import com.linecorp.armeria.client.Endpoint;
23+
import com.linecorp.armeria.common.annotation.Nullable;
24+
import com.linecorp.armeria.common.loadbalancer.LoadBalancer;
25+
import com.linecorp.armeria.common.util.ListenableAsyncCloseable;
26+
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;
27+
28+
final class DefaultEndpointSelector<T extends LoadBalancer<Endpoint, ClientRequestContext>>
29+
extends AbstractEndpointSelector {
30+
31+
private final LoadBalancerFactory<T> loadBalancerFactory;
32+
@Nullable
33+
private volatile T loadBalancer;
34+
private boolean closed;
35+
private final ReentrantShortLock lock = new ReentrantShortLock();
36+
37+
DefaultEndpointSelector(EndpointGroup endpointGroup,
38+
LoadBalancerFactory<T> loadBalancerFactory) {
39+
super(endpointGroup);
40+
this.loadBalancerFactory = loadBalancerFactory;
41+
if (endpointGroup instanceof ListenableAsyncCloseable) {
42+
((ListenableAsyncCloseable) endpointGroup).whenClosed().thenAccept(unused -> {
43+
lock.lock();
44+
try {
45+
closed = true;
46+
final T loadBalancer = this.loadBalancer;
47+
if (loadBalancer != null) {
48+
loadBalancer.close();
49+
}
50+
} finally {
51+
lock.unlock();
52+
}
53+
});
54+
}
55+
initialize();
56+
}
57+
58+
@Override
59+
protected void updateNewEndpoints(List<Endpoint> endpoints) {
60+
lock.lock();
61+
try {
62+
if (closed) {
63+
return;
64+
}
65+
loadBalancer = loadBalancerFactory.newLoadBalancer(loadBalancer, endpoints);
66+
} finally {
67+
lock.unlock();
68+
}
69+
}
70+
71+
@Nullable
72+
@Override
73+
public Endpoint selectNow(ClientRequestContext ctx) {
74+
final T loadBalancer = this.loadBalancer;
75+
if (loadBalancer == null) {
76+
return null;
77+
}
78+
return loadBalancer.pick(ctx);
79+
}
80+
81+
@FunctionalInterface
82+
interface LoadBalancerFactory<T> {
83+
T newLoadBalancer(@Nullable T oldLoadBalancer, List<Endpoint> candidates);
84+
85+
@SuppressWarnings("unchecked")
86+
default T unsafeCast(LoadBalancer<Endpoint, ?> loadBalancer) {
87+
return (T) loadBalancer;
88+
}
89+
}
90+
}

core/src/main/java/com/linecorp/armeria/client/endpoint/EndpointSelectionStrategy.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.linecorp.armeria.client.Endpoint;
2323
import com.linecorp.armeria.common.HttpRequest;
2424
import com.linecorp.armeria.common.annotation.UnstableApi;
25+
import com.linecorp.armeria.common.loadbalancer.WeightTransition;
2526

2627
/**
2728
* {@link Endpoint} selection strategy that creates a {@link EndpointSelector}.
@@ -53,7 +54,7 @@ static EndpointSelectionStrategy roundRobin() {
5354

5455
/**
5556
* Returns a weight ramping up {@link EndpointSelectionStrategy} which ramps the weight of newly added
56-
* {@link Endpoint}s using {@link EndpointWeightTransition#linear()}. The {@link Endpoint} is selected
57+
* {@link Endpoint}s using {@link WeightTransition#linear()}. The {@link Endpoint} is selected
5758
* using weighted random distribution.
5859
* The weights of {@link Endpoint}s are ramped up by 10 percent every 2 seconds up to 100 percent
5960
* by default. If you want to customize the parameters, use {@link #builderForRampingUp()}.

core/src/main/java/com/linecorp/armeria/client/endpoint/EndpointWeightTransition.java

+15-15
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,31 @@
1616
package com.linecorp.armeria.client.endpoint;
1717

1818
import static com.google.common.base.Preconditions.checkArgument;
19-
import static com.linecorp.armeria.client.endpoint.WeightRampingUpStrategyBuilder.DEFAULT_LINEAR_TRANSITION;
20-
21-
import com.google.common.primitives.Ints;
2219

2320
import com.linecorp.armeria.client.Endpoint;
21+
import com.linecorp.armeria.common.loadbalancer.WeightTransition;
2422

2523
/**
2624
* Computes the weight of the given {@link Endpoint} using the given {@code currentStep}
2725
* and {@code totalSteps}.
26+
*
27+
* @deprecated Use {@link WeightTransition} instead.
2828
*/
29+
@Deprecated
2930
@FunctionalInterface
3031
public interface EndpointWeightTransition {
3132

3233
/**
3334
* Returns the {@link EndpointWeightTransition} which returns the gradually increased weight as the current
3435
* step increases.
36+
*
37+
* @deprecated Use {@link WeightTransition#linear()} instead.
3538
*/
39+
@Deprecated
3640
static EndpointWeightTransition linear() {
37-
return DEFAULT_LINEAR_TRANSITION;
41+
return (endpoint, currentStep, totalSteps) -> {
42+
return WeightTransition.linear().compute(endpoint, endpoint.weight(), currentStep, totalSteps);
43+
};
3844
}
3945

4046
/**
@@ -44,24 +50,18 @@ static EndpointWeightTransition linear() {
4450
* Refer to the following
4551
* <a href="https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/slow_start">link</a>
4652
* for more information.
53+
*
54+
* @deprecated Use {@link WeightTransition#aggression(double, double)} instead.
4755
*/
56+
@Deprecated
4857
static EndpointWeightTransition aggression(double aggression, double minWeightPercent) {
4958
checkArgument(aggression > 0,
5059
"aggression: %s (expected: > 0.0)", aggression);
5160
checkArgument(minWeightPercent >= 0 && minWeightPercent <= 1.0,
5261
"minWeightPercent: %s (expected: >= 0.0, <= 1.0)", minWeightPercent);
53-
final int aggressionPercentage = Ints.saturatedCast(Math.round(aggression * 100));
54-
final double invertedAggression = 100.0 / aggressionPercentage;
5562
return (endpoint, currentStep, totalSteps) -> {
56-
final int weight = endpoint.weight();
57-
final int minWeight = Ints.saturatedCast(Math.round(weight * minWeightPercent));
58-
final int computedWeight;
59-
if (aggressionPercentage == 100) {
60-
computedWeight = linear().compute(endpoint, currentStep, totalSteps);
61-
} else {
62-
computedWeight = (int) (weight * Math.pow(1.0 * currentStep / totalSteps, invertedAggression));
63-
}
64-
return Math.max(computedWeight, minWeight);
63+
return WeightTransition.aggression(aggression, minWeightPercent)
64+
.compute(endpoint, endpoint.weight(), currentStep, totalSteps);
6565
};
6666
}
6767

core/src/main/java/com/linecorp/armeria/client/endpoint/RoundRobinStrategy.java

+11-38
Original file line numberDiff line numberDiff line change
@@ -17,54 +17,27 @@
1717
package com.linecorp.armeria.client.endpoint;
1818

1919
import java.util.List;
20-
import java.util.concurrent.atomic.AtomicInteger;
21-
22-
import com.google.common.base.MoreObjects;
2320

2421
import com.linecorp.armeria.client.ClientRequestContext;
2522
import com.linecorp.armeria.client.Endpoint;
23+
import com.linecorp.armeria.client.endpoint.DefaultEndpointSelector.LoadBalancerFactory;
2624
import com.linecorp.armeria.common.annotation.Nullable;
25+
import com.linecorp.armeria.common.loadbalancer.LoadBalancer;
2726

28-
final class RoundRobinStrategy implements EndpointSelectionStrategy {
29-
30-
static final RoundRobinStrategy INSTANCE = new RoundRobinStrategy();
27+
enum RoundRobinStrategy
28+
implements EndpointSelectionStrategy,
29+
LoadBalancerFactory<LoadBalancer<Endpoint, ClientRequestContext>> {
3130

32-
private RoundRobinStrategy() {}
31+
INSTANCE;
3332

3433
@Override
3534
public EndpointSelector newSelector(EndpointGroup endpointGroup) {
36-
return new RoundRobinSelector(endpointGroup);
35+
return new DefaultEndpointSelector<>(endpointGroup, this);
3736
}
3837

39-
/**
40-
* A round robin select strategy.
41-
*
42-
* <p>For example, with node a, b and c, then select result is abc abc ...
43-
*/
44-
static class RoundRobinSelector extends AbstractEndpointSelector {
45-
private final AtomicInteger sequence = new AtomicInteger();
46-
47-
RoundRobinSelector(EndpointGroup endpointGroup) {
48-
super(endpointGroup);
49-
initialize();
50-
}
51-
52-
@Nullable
53-
@Override
54-
public Endpoint selectNow(ClientRequestContext ctx) {
55-
final List<Endpoint> endpoints = group().endpoints();
56-
if (endpoints.isEmpty()) {
57-
return null;
58-
}
59-
final int currentSequence = sequence.getAndIncrement();
60-
return endpoints.get(Math.abs(currentSequence % endpoints.size()));
61-
}
62-
63-
@Override
64-
public String toString() {
65-
return MoreObjects.toStringHelper(this)
66-
.add("endpoints", group().endpoints())
67-
.toString();
68-
}
38+
@Override
39+
public LoadBalancer<Endpoint, ClientRequestContext> newLoadBalancer(
40+
@Nullable LoadBalancer<Endpoint, ClientRequestContext> oldLoadBalancer, List<Endpoint> candidates) {
41+
return unsafeCast(LoadBalancer.ofRoundRobin(candidates));
6942
}
7043
}

core/src/main/java/com/linecorp/armeria/client/endpoint/StickyEndpointSelectionStrategy.java

+11-39
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@
2020
import java.util.List;
2121
import java.util.function.ToLongFunction;
2222

23-
import com.google.common.base.MoreObjects;
24-
import com.google.common.hash.Hashing;
25-
2623
import com.linecorp.armeria.client.ClientRequestContext;
2724
import com.linecorp.armeria.client.Endpoint;
25+
import com.linecorp.armeria.client.endpoint.DefaultEndpointSelector.LoadBalancerFactory;
2826
import com.linecorp.armeria.common.HttpRequest;
2927
import com.linecorp.armeria.common.annotation.Nullable;
28+
import com.linecorp.armeria.common.loadbalancer.LoadBalancer;
3029

3130
/**
3231
* An {@link EndpointSelector} strategy which implements sticky load-balancing using
@@ -46,7 +45,9 @@
4645
* final StickyEndpointSelectionStrategy strategy = new StickyEndpointSelectionStrategy(hasher);
4746
* }</pre>
4847
*/
49-
final class StickyEndpointSelectionStrategy implements EndpointSelectionStrategy {
48+
final class StickyEndpointSelectionStrategy
49+
implements EndpointSelectionStrategy,
50+
LoadBalancerFactory<LoadBalancer<Endpoint, ClientRequestContext>> {
5051

5152
private final ToLongFunction<? super ClientRequestContext> requestContextHasher;
5253

@@ -61,45 +62,16 @@ final class StickyEndpointSelectionStrategy implements EndpointSelectionStrategy
6162
}
6263

6364
/**
64-
* Creates a new {@link StickyEndpointSelector}.
65-
*
66-
* @param endpointGroup an {@link EndpointGroup}
67-
* @return a new {@link StickyEndpointSelector}
65+
* Creates a new sticky {@link EndpointSelector}.
6866
*/
6967
@Override
7068
public EndpointSelector newSelector(EndpointGroup endpointGroup) {
71-
return new StickyEndpointSelector(endpointGroup, requestContextHasher);
69+
return new DefaultEndpointSelector<>(endpointGroup, this);
7270
}
7371

74-
private static final class StickyEndpointSelector extends AbstractEndpointSelector {
75-
76-
private final ToLongFunction<? super ClientRequestContext> requestContextHasher;
77-
78-
StickyEndpointSelector(EndpointGroup endpointGroup,
79-
ToLongFunction<? super ClientRequestContext> requestContextHasher) {
80-
super(endpointGroup);
81-
this.requestContextHasher = requireNonNull(requestContextHasher, "requestContextHasher");
82-
initialize();
83-
}
84-
85-
@Nullable
86-
@Override
87-
public Endpoint selectNow(ClientRequestContext ctx) {
88-
final List<Endpoint> endpoints = group().endpoints();
89-
if (endpoints.isEmpty()) {
90-
return null;
91-
}
92-
93-
final long key = requestContextHasher.applyAsLong(ctx);
94-
final int nearest = Hashing.consistentHash(key, endpoints.size());
95-
return endpoints.get(nearest);
96-
}
97-
98-
@Override
99-
public String toString() {
100-
return MoreObjects.toStringHelper(this)
101-
.add("endpoints", group().endpoints())
102-
.toString();
103-
}
72+
@Override
73+
public LoadBalancer<Endpoint, ClientRequestContext> newLoadBalancer(
74+
@Nullable LoadBalancer<Endpoint, ClientRequestContext> oldLoadBalancer, List<Endpoint> candidates) {
75+
return LoadBalancer.ofSticky(candidates, requestContextHasher);
10476
}
10577
}

0 commit comments

Comments
 (0)