Skip to content

Commit

Permalink
core: Alternate ipV4 and ipV6 addresses for Happy Eyeballs in PickFir…
Browse files Browse the repository at this point in the history
…stLeafLoadBalancer (grpc#11624)

* Interweave ipV4 and ipV6 addresses as per gRFC.
  • Loading branch information
larry-safran authored Jan 14, 2025
1 parent 7162d2d commit 228dcf7
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 47 deletions.
159 changes: 116 additions & 43 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -58,17 +60,17 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName());
@VisibleForTesting
static final int CONNECTION_DELAY_INTERVAL_MS = 250;
private final boolean enableHappyEyeballs = !isSerializingRetries()
&& PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private final Helper helper;
private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
private final Index addressIndex = new Index(ImmutableList.of());
private final Index addressIndex = new Index(ImmutableList.of(), this.enableHappyEyeballs);
private int numTf = 0;
private boolean firstPass = true;
@Nullable
private ScheduledHandle scheduleConnectionTask = null;
private ConnectivityState rawConnectivityState = IDLE;
private ConnectivityState concludedState = IDLE;
private final boolean enableHappyEyeballs = !isSerializingRetries()
&& PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private boolean notAPetiolePolicy = true; // means not under a petiole policy
private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
private BackoffPolicy reconnectPolicy;
Expand Down Expand Up @@ -610,27 +612,26 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
}

/**
* Index as in 'i', the pointer to an entry. Not a "search index."
* This contains both an ordered list of addresses and a pointer(i.e. index) to the current entry.
* All updates should be done in a synchronization context.
*/
@VisibleForTesting
static final class Index {
private List<EquivalentAddressGroup> addressGroups;
private int size;
private int groupIndex;
private int addressIndex;
private List<UnwrappedEag> orderedAddresses;
private int activeElement = 0;
private boolean enableHappyEyeballs;

public Index(List<EquivalentAddressGroup> groups) {
Index(List<EquivalentAddressGroup> groups, boolean enableHappyEyeballs) {
this.enableHappyEyeballs = enableHappyEyeballs;
updateGroups(groups);
}

public boolean isValid() {
// Is invalid if empty or has incremented off the end
return groupIndex < addressGroups.size();
return activeElement < orderedAddresses.size();
}

public boolean isAtBeginning() {
return groupIndex == 0 && addressIndex == 0;
return activeElement == 0;
}

/**
Expand All @@ -642,79 +643,150 @@ public boolean increment() {
return false;
}

EquivalentAddressGroup group = addressGroups.get(groupIndex);
addressIndex++;
if (addressIndex >= group.getAddresses().size()) {
groupIndex++;
addressIndex = 0;
return groupIndex < addressGroups.size();
}
activeElement++;

return true;
return isValid();
}

public void reset() {
groupIndex = 0;
addressIndex = 0;
activeElement = 0;
}

public SocketAddress getCurrentAddress() {
if (!isValid()) {
throw new IllegalStateException("Index is past the end of the address group list");
}
return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
return orderedAddresses.get(activeElement).address;
}

public Attributes getCurrentEagAttributes() {
if (!isValid()) {
throw new IllegalStateException("Index is off the end of the address group list");
}
return addressGroups.get(groupIndex).getAttributes();
return orderedAddresses.get(activeElement).attributes;
}

public List<EquivalentAddressGroup> getCurrentEagAsList() {
return Collections.singletonList(
new EquivalentAddressGroup(getCurrentAddress(), getCurrentEagAttributes()));
return Collections.singletonList(getCurrentEag());
}

private EquivalentAddressGroup getCurrentEag() {
if (!isValid()) {
throw new IllegalStateException("Index is past the end of the address group list");
}
return orderedAddresses.get(activeElement).asEag();
}

/**
* Update to new groups, resetting the current index.
*/
public void updateGroups(List<EquivalentAddressGroup> newGroups) {
addressGroups = checkNotNull(newGroups, "newGroups");
checkNotNull(newGroups, "newGroups");
orderedAddresses = enableHappyEyeballs
? updateGroupsHE(newGroups)
: updateGroupsNonHE(newGroups);
reset();
int size = 0;
for (EquivalentAddressGroup eag : newGroups) {
size += eag.getAddresses().size();
}
this.size = size;
}

/**
* Returns false if the needle was not found and the current index was left unchanged.
*/
public boolean seekTo(SocketAddress needle) {
for (int i = 0; i < addressGroups.size(); i++) {
EquivalentAddressGroup group = addressGroups.get(i);
int j = group.getAddresses().indexOf(needle);
if (j == -1) {
continue;
checkNotNull(needle, "needle");
for (int i = 0; i < orderedAddresses.size(); i++) {
if (orderedAddresses.get(i).address.equals(needle)) {
this.activeElement = i;
return true;
}
this.groupIndex = i;
this.addressIndex = j;
return true;
}
return false;
}

public int size() {
return size;
return orderedAddresses.size();
}

private List<UnwrappedEag> updateGroupsNonHE(List<EquivalentAddressGroup> newGroups) {
List<UnwrappedEag> entries = new ArrayList<>();
for (int g = 0; g < newGroups.size(); g++) {
EquivalentAddressGroup eag = newGroups.get(g);
for (int a = 0; a < eag.getAddresses().size(); a++) {
SocketAddress addr = eag.getAddresses().get(a);
entries.add(new UnwrappedEag(eag.getAttributes(), addr));
}
}

return entries;
}

private List<UnwrappedEag> updateGroupsHE(List<EquivalentAddressGroup> newGroups) {
Boolean firstIsV6 = null;
List<UnwrappedEag> v4Entries = new ArrayList<>();
List<UnwrappedEag> v6Entries = new ArrayList<>();
for (int g = 0; g < newGroups.size(); g++) {
EquivalentAddressGroup eag = newGroups.get(g);
for (int a = 0; a < eag.getAddresses().size(); a++) {
SocketAddress addr = eag.getAddresses().get(a);
boolean isIpV4 = addr instanceof InetSocketAddress
&& ((InetSocketAddress) addr).getAddress() instanceof Inet4Address;
if (isIpV4) {
if (firstIsV6 == null) {
firstIsV6 = false;
}
v4Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
} else {
if (firstIsV6 == null) {
firstIsV6 = true;
}
v6Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
}
}
}

return firstIsV6 != null && firstIsV6
? interleave(v6Entries, v4Entries)
: interleave(v4Entries, v6Entries);
}

private List<UnwrappedEag> interleave(List<UnwrappedEag> firstFamily,
List<UnwrappedEag> secondFamily) {
if (firstFamily.isEmpty()) {
return secondFamily;
}
if (secondFamily.isEmpty()) {
return firstFamily;
}

List<UnwrappedEag> result = new ArrayList<>(firstFamily.size() + secondFamily.size());
for (int i = 0; i < Math.max(firstFamily.size(), secondFamily.size()); i++) {
if (i < firstFamily.size()) {
result.add(firstFamily.get(i));
}
if (i < secondFamily.size()) {
result.add(secondFamily.get(i));
}
}
return result;
}

private static final class UnwrappedEag {
private final Attributes attributes;
private final SocketAddress address;

public UnwrappedEag(Attributes attributes, SocketAddress address) {
this.attributes = attributes;
this.address = address;
}

private EquivalentAddressGroup asEag() {
return new EquivalentAddressGroup(address, attributes);
}
}
}

@VisibleForTesting
int getGroupIndex() {
return addressIndex.groupIndex;
int getIndexLocation() {
return addressIndex.activeElement;
}

@VisibleForTesting
Expand Down Expand Up @@ -778,4 +850,5 @@ public PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList) {
this.randomSeed = randomSeed;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assume.assumeTrue;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -67,6 +68,7 @@
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.internal.PickFirstLeafLoadBalancer.PickFirstLeafLoadBalancerConfig;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -2618,7 +2620,7 @@ public void serialized_retries_two_passes() {
forwardTimeByBackoffDelay(); // should trigger retry again
for (int i = 0; i < subchannels.length; i++) {
inOrder.verify(subchannels[i]).requestConnection();
assertEquals(i, loadBalancer.getGroupIndex());
assertEquals(i, loadBalancer.getIndexLocation());
listeners[i].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); // cascade
}
}
Expand All @@ -2637,7 +2639,7 @@ public void index_looping() {
PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1, addr2), attr1),
new EquivalentAddressGroup(Arrays.asList(addr3), attr2),
new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3)));
new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3)), enableHappyEyeballs);
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1);
assertThat(index.isAtBeginning()).isTrue();
Expand Down Expand Up @@ -2696,7 +2698,7 @@ public void index_updateGroups_resets() {
SocketAddress addr3 = new FakeSocketAddress("addr3");
PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1)),
new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
new EquivalentAddressGroup(Arrays.asList(addr2, addr3))), enableHappyEyeballs);
index.increment();
index.increment();
// We want to make sure both groupIndex and addressIndex are reset
Expand All @@ -2713,7 +2715,7 @@ public void index_seekTo() {
SocketAddress addr3 = new FakeSocketAddress("addr3");
PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1, addr2)),
new EquivalentAddressGroup(Arrays.asList(addr3))));
new EquivalentAddressGroup(Arrays.asList(addr3))), enableHappyEyeballs);
assertThat(index.seekTo(addr3)).isTrue();
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3);
assertThat(index.seekTo(addr1)).isTrue();
Expand All @@ -2725,6 +2727,83 @@ public void index_seekTo() {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2);
}

@Test
public void index_interleaving() {
InetSocketAddress addr1_6 = new InetSocketAddress("f38:1:1", 1234);
InetSocketAddress addr1_4 = new InetSocketAddress("10.1.1.1", 1234);
InetSocketAddress addr2_4 = new InetSocketAddress("10.1.1.2", 1234);
InetSocketAddress addr3_4 = new InetSocketAddress("10.1.1.3", 1234);
InetSocketAddress addr4_4 = new InetSocketAddress("10.1.1.4", 1234);
InetSocketAddress addr4_6 = new InetSocketAddress("f38:1:4", 1234);

Attributes attrs1 = Attributes.newBuilder().build();
Attributes attrs2 = Attributes.newBuilder().build();
Attributes attrs3 = Attributes.newBuilder().build();
Attributes attrs4 = Attributes.newBuilder().build();

PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1_4, addr1_6), attrs1),
new EquivalentAddressGroup(Arrays.asList(addr2_4), attrs2),
new EquivalentAddressGroup(Arrays.asList(addr3_4), attrs3),
new EquivalentAddressGroup(Arrays.asList(addr4_4, addr4_6), attrs4)), enableHappyEyeballs);

assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_4);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs1);
assertThat(index.isAtBeginning()).isTrue();

index.increment();
assertThat(index.isValid()).isTrue();
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_6);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs1);
assertThat(index.isAtBeginning()).isFalse();

index.increment();
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2_4);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs2);

index.increment();
if (enableHappyEyeballs) {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_6);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs4);
} else {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3_4);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs3);
}

index.increment();
if (enableHappyEyeballs) {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3_4);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs3);
} else {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs4);
}

// Move to last entry
assertThat(index.increment()).isTrue();
assertThat(index.isValid()).isTrue();
if (enableHappyEyeballs) {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4);
} else {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_6);
}

// Move off of the end
assertThat(index.increment()).isFalse();
assertThat(index.isValid()).isFalse();
assertThrows(IllegalStateException.class, index::getCurrentAddress);

// Reset
index.reset();
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_4);
assertThat(index.isAtBeginning()).isTrue();
assertThat(index.isValid()).isTrue();

// Seek to an address
assertThat(index.seekTo(addr4_4)).isTrue();
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4);
}

private static class FakeSocketAddress extends SocketAddress {
final String name;

Expand Down

0 comments on commit 228dcf7

Please sign in to comment.