Skip to content

Commit 242e7b3

Browse files
authored
feat(auto_balancer): assign partitions within rack on broker fence (AutoMQ#1778) (AutoMQ#1783)
Signed-off-by: Shichao Nie <niesc@automq.com>
1 parent 7d91cb7 commit 242e7b3

File tree

4 files changed

+86
-29
lines changed

4 files changed

+86
-29
lines changed

metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -849,10 +849,9 @@ public void replay(UpdateNextNodeIdRecord record) {
849849
nextNodeId.set(record.nodeId());
850850
}
851851

852-
public List<Integer> getActiveBrokers() {
852+
public List<BrokerRegistration> getActiveBrokers() {
853853
return brokerRegistrations.values().stream()
854-
.map(BrokerRegistration::id)
855-
.filter(this::isActive)
854+
.filter(b -> isActive(b.id()))
856855
.collect(Collectors.toList());
857856
}
858857
// AutoMQ inject end

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -2115,6 +2115,7 @@ void generateLeaderAndIsrUpdates(String context,
21152115
IntPredicate isAcceptableLeader = fencing ? r -> false :
21162116
r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r));
21172117

2118+
BrokerRegistration brokerRegistrationToRemove = clusterControl.brokerRegistrations().get(brokerToRemove);
21182119
PartitionLeaderSelector partitionLeaderSelector = null;
21192120
// AutoMQ for Kafka inject end
21202121

@@ -2157,8 +2158,7 @@ void generateLeaderAndIsrUpdates(String context,
21572158
builder.setTargetNode(brokerToAdd);
21582159
} else {
21592160
if (partitionLeaderSelector == null) {
2160-
partitionLeaderSelector = new LoadAwarePartitionLeaderSelector(clusterControl.getActiveBrokers(),
2161-
brokerId -> brokerId != brokerToRemove);
2161+
partitionLeaderSelector = new LoadAwarePartitionLeaderSelector(clusterControl.getActiveBrokers(), brokerRegistrationToRemove);
21622162
}
21632163
partitionLeaderSelector
21642164
.select(new TopicPartition(topic.name(), topicIdPart.partitionId()))

metadata/src/main/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelector.java

+13-11
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111

1212
package org.apache.kafka.controller.es;
1313

14+
import java.util.stream.Collectors;
1415
import java.util.Random;
1516
import org.apache.kafka.common.TopicPartition;
17+
import org.apache.kafka.metadata.BrokerRegistration;
1618
import org.apache.kafka.common.WeightedRandomList;
1719
import org.slf4j.Logger;
1820
import org.slf4j.LoggerFactory;
@@ -23,46 +25,46 @@
2325
import java.util.Map;
2426
import java.util.Optional;
2527
import java.util.Set;
26-
import java.util.function.Predicate;
2728

2829
public class LoadAwarePartitionLeaderSelector implements PartitionLeaderSelector {
2930
private static final Logger LOGGER = LoggerFactory.getLogger(LoadAwarePartitionLeaderSelector.class);
3031
private final WeightedRandomList<Integer> brokerLoads;
3132
private final RandomPartitionLeaderSelector randomSelector;
3233

33-
public LoadAwarePartitionLeaderSelector(List<Integer> aliveBrokers, Predicate<Integer> brokerPredicate) {
34-
this(new Random(), aliveBrokers, brokerPredicate);
34+
public LoadAwarePartitionLeaderSelector(List<BrokerRegistration> aliveBrokers, BrokerRegistration brokerToRemove) {
35+
this(new Random(), aliveBrokers, brokerToRemove);
3536
}
3637

37-
public LoadAwarePartitionLeaderSelector(Random r, List<Integer> aliveBrokers, Predicate<Integer> brokerPredicate) {
38+
public LoadAwarePartitionLeaderSelector(Random r, List<BrokerRegistration> aliveBrokers, BrokerRegistration brokerToRemove) {
3839
Map<Integer, Double> brokerLoadMap = ClusterStats.getInstance().brokerLoads();
3940
if (brokerLoadMap == null) {
4041
this.brokerLoads = null;
4142
LOGGER.warn("No broker loads available, using random partition leader selector");
42-
this.randomSelector = new RandomPartitionLeaderSelector(aliveBrokers, brokerPredicate);
43+
this.randomSelector = new RandomPartitionLeaderSelector(aliveBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toList()), brokerId -> brokerId != brokerToRemove.id());
4344
return;
4445
}
4546
Set<Integer> excludedBrokers = ClusterStats.getInstance().excludedBrokers();
4647
if (excludedBrokers == null) {
4748
excludedBrokers = new HashSet<>();
4849
}
49-
List<Integer> availableBrokers = new ArrayList<>();
50-
for (int broker : aliveBrokers) {
51-
if (!excludedBrokers.contains(broker)) {
50+
List<BrokerRegistration> availableBrokers = new ArrayList<>();
51+
for (BrokerRegistration broker : aliveBrokers) {
52+
if (!excludedBrokers.contains(broker.id()) && broker.id() != brokerToRemove.id()) {
5253
availableBrokers.add(broker);
5354
}
5455
}
5556
brokerLoads = new WeightedRandomList<>(r);
56-
for (int brokerId : availableBrokers) {
57-
if (!brokerPredicate.test(brokerId) || !brokerLoadMap.containsKey(brokerId)) {
57+
for (BrokerRegistration broker : availableBrokers) {
58+
int brokerId = broker.id();
59+
if (!broker.rack().equals(brokerToRemove.rack()) || !brokerLoadMap.containsKey(brokerId)) {
5860
continue;
5961
}
6062
double load = Math.max(1, brokerLoadMap.get(brokerId));
6163
// allocation weight is inversely proportional to the load
6264
brokerLoads.add(new WeightedRandomList.Entity<>(brokerId, 1 / load));
6365
}
6466
brokerLoads.update();
65-
this.randomSelector = new RandomPartitionLeaderSelector(availableBrokers, brokerPredicate);
67+
this.randomSelector = new RandomPartitionLeaderSelector(availableBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toList()), id -> true);
6668
}
6769

6870
@Override

metadata/src/test/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelectorTest.java

+69-13
Original file line numberDiff line numberDiff line change
@@ -11,32 +11,48 @@
1111

1212
package org.apache.kafka.controller.es;
1313

14+
import java.util.Collections;
15+
import java.util.Optional;
16+
import java.util.stream.Collectors;
1417
import org.apache.kafka.common.TopicPartition;
18+
import org.apache.kafka.metadata.BrokerRegistration;
1519
import org.apache.kafka.server.util.MockRandom;
20+
import org.junit.jupiter.api.AfterEach;
1621
import org.junit.jupiter.api.Assertions;
1722
import org.junit.jupiter.api.Test;
1823

1924
import java.util.HashMap;
20-
import java.util.HashSet;
2125
import java.util.List;
2226
import java.util.Map;
2327
import java.util.Set;
2428

2529
public class LoadAwarePartitionLeaderSelectorTest {
2630

31+
@AfterEach
32+
public void tearDown() {
33+
ClusterStats.getInstance().updateExcludedBrokers(Collections.emptySet());
34+
ClusterStats.getInstance().updateBrokerLoads(Collections.emptyMap());
35+
ClusterStats.getInstance().updatePartitionLoads(Collections.emptyMap());
36+
}
37+
2738
@Test
2839
public void testLoadAwarePartitionLeaderSelector() {
29-
List<Integer> aliveBrokers = List.of(0, 1, 2, 3, 4, 5);
30-
Set<Integer> brokerSet = new HashSet<>(aliveBrokers);
31-
int brokerToRemove = 5;
40+
List<BrokerRegistration> aliveBrokers = List.of(
41+
new BrokerRegistration.Builder().setId(0).build(),
42+
new BrokerRegistration.Builder().setId(1).build(),
43+
new BrokerRegistration.Builder().setId(2).build(),
44+
new BrokerRegistration.Builder().setId(3).build(),
45+
new BrokerRegistration.Builder().setId(4).build(),
46+
new BrokerRegistration.Builder().setId(5).build());
47+
Set<Integer> brokerSet = aliveBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toSet());
48+
BrokerRegistration brokerToRemove = aliveBrokers.get(aliveBrokers.size() - 1);
3249
MockRandom random = new MockRandom();
33-
LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random,
34-
aliveBrokers, broker -> broker != brokerToRemove);
50+
LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);
3551

3652
// fallback to random selector
3753
setUpCluster();
3854
Map<Integer, Double> brokerLoads = new HashMap<>();
39-
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove, brokerLoads);
55+
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
4056
Assertions.assertEquals(4000, brokerLoads.get(0));
4157
Assertions.assertEquals(4000, brokerLoads.get(1));
4258
Assertions.assertEquals(4000, brokerLoads.get(2));
@@ -45,8 +61,8 @@ public void testLoadAwarePartitionLeaderSelector() {
4561

4662
// load aware selector
4763
brokerLoads = setUpCluster();
48-
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, broker -> broker != brokerToRemove);
49-
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove, brokerLoads);
64+
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);
65+
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
5066
Assertions.assertEquals(5990, brokerLoads.get(0));
5167
Assertions.assertEquals(7660, brokerLoads.get(1));
5268
Assertions.assertEquals(6720, brokerLoads.get(2));
@@ -57,8 +73,8 @@ public void testLoadAwarePartitionLeaderSelector() {
5773
brokerLoads = setUpCluster();
5874
brokerLoads.remove(1);
5975
ClusterStats.getInstance().updateBrokerLoads(brokerLoads);
60-
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, broker -> broker != brokerToRemove);
61-
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove, brokerLoads);
76+
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);
77+
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
6278
Assertions.assertEquals(6840, brokerLoads.get(0));
6379
Assertions.assertEquals(7280, brokerLoads.get(2));
6480
Assertions.assertEquals(7950, brokerLoads.get(3));
@@ -67,8 +83,8 @@ public void testLoadAwarePartitionLeaderSelector() {
6783
// tests exclude broker
6884
brokerLoads = setUpCluster();
6985
ClusterStats.getInstance().updateExcludedBrokers(Set.of(1));
70-
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, broker -> broker != brokerToRemove);
71-
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove, brokerLoads);
86+
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);
87+
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
7288
Assertions.assertEquals(6970, brokerLoads.get(0));
7389
Assertions.assertEquals(5000, brokerLoads.get(1));
7490
Assertions.assertEquals(7210, brokerLoads.get(2));
@@ -93,6 +109,46 @@ private void randomSelect(LoadAwarePartitionLeaderSelector selector, int count,
93109
}
94110
}
95111

112+
@Test
113+
public void testLoadAwarePartitionLeaderSelectorWithRack() {
114+
String rackA = "rack-a";
115+
String rackB = "rack-b";
116+
List<BrokerRegistration> aliveBrokers = List.of(
117+
new BrokerRegistration.Builder().setId(0).setRack(Optional.of(rackA)).build(),
118+
new BrokerRegistration.Builder().setId(1).setRack(Optional.of(rackB)).build(),
119+
new BrokerRegistration.Builder().setId(2).setRack(Optional.of(rackB)).build(),
120+
new BrokerRegistration.Builder().setId(3).setRack(Optional.of(rackB)).build(),
121+
new BrokerRegistration.Builder().setId(4).setRack(Optional.of(rackB)).build(),
122+
new BrokerRegistration.Builder().setId(5).setRack(Optional.of(rackB)).build());
123+
124+
Set<Integer> brokerSet = aliveBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toSet());
125+
setUpCluster();
126+
BrokerRegistration brokerToRemove = aliveBrokers.get(0);
127+
MockRandom random = new MockRandom();
128+
LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);
129+
130+
// fallback to random selector
131+
Map<Integer, Double> brokerLoads = new HashMap<>();
132+
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
133+
Assertions.assertEquals(4000, brokerLoads.get(1));
134+
Assertions.assertEquals(4000, brokerLoads.get(2));
135+
Assertions.assertEquals(4000, brokerLoads.get(3));
136+
Assertions.assertEquals(4000, brokerLoads.get(4));
137+
Assertions.assertEquals(4000, brokerLoads.get(5));
138+
139+
// load aware selector
140+
brokerLoads = setUpCluster();
141+
brokerToRemove = aliveBrokers.get(1);
142+
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);
143+
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
144+
Assertions.assertEquals(0, brokerLoads.get(0));
145+
Assertions.assertEquals(5000, brokerLoads.get(1));
146+
Assertions.assertEquals(7330, brokerLoads.get(2));
147+
Assertions.assertEquals(7840, brokerLoads.get(3));
148+
Assertions.assertEquals(7120, brokerLoads.get(4));
149+
Assertions.assertEquals(6710, brokerLoads.get(5));
150+
}
151+
96152
private Map<Integer, Double> setUpCluster() {
97153
Map<Integer, Double> brokerLoads = new HashMap<>();
98154
brokerLoads.put(0, 0.0);

0 commit comments

Comments
 (0)