From 5e1d0d7950bfdfd1e5ca4707ff4db19006dfe489 Mon Sep 17 00:00:00 2001 From: Allard Buijze Date: Wed, 23 Oct 2024 14:54:32 +0200 Subject: [PATCH] Randomized connection order of servers Randomizing the order in which servers are used for connections prevents a misbehaving (e.g. out of sync) first candidate to prevent the application from connecting via other instances in the cluster. --- .../connector/impl/AxonServerManagedChannel.java | 12 ++++++++++-- .../AxonServerManagedChannelIntegrationTest.java | 10 ++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/axoniq/axonserver/connector/impl/AxonServerManagedChannel.java b/src/main/java/io/axoniq/axonserver/connector/impl/AxonServerManagedChannel.java index 43ae627e..7168ff9c 100644 --- a/src/main/java/io/axoniq/axonserver/connector/impl/AxonServerManagedChannel.java +++ b/src/main/java/io/axoniq/axonserver/connector/impl/AxonServerManagedChannel.java @@ -31,11 +31,13 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -102,7 +104,12 @@ public AxonServerManagedChannel(List routingServers, private ManagedChannel connectChannel() { ManagedChannel connection = null; - for (ServerAddress nodeInfo : routingServers) { + + // reorder the addresses to avoid a misbehaving platform server to prevent connections + List routingCandidates = new ArrayList<>(routingServers); + Collections.shuffle(routingCandidates, ThreadLocalRandom.current()); + + for (ServerAddress nodeInfo : routingCandidates) { ManagedChannel candidate = null; try { candidate = connectionFactory.apply(nodeInfo, context); @@ -214,7 +221,8 @@ public ClientCall newCall(MethodDescriptor met @Override public String authority() { - return routingServers.get(0).toString(); + ManagedChannel activeChannel = this.activeChannel.get(); + return activeChannel != null ? activeChannel.authority() : routingServers.get(0).toString(); } @Override diff --git a/src/test/java/io/axoniq/axonserver/connector/impl/AxonServerManagedChannelIntegrationTest.java b/src/test/java/io/axoniq/axonserver/connector/impl/AxonServerManagedChannelIntegrationTest.java index 3e6736b9..ba647553 100644 --- a/src/test/java/io/axoniq/axonserver/connector/impl/AxonServerManagedChannelIntegrationTest.java +++ b/src/test/java/io/axoniq/axonserver/connector/impl/AxonServerManagedChannelIntegrationTest.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; @@ -203,10 +204,11 @@ void connectionRecoveredOnDisconnection() throws Exception { nextConnectTask.run(); - assertEquals(asList(new ServerAddress("server1"), - new ServerAddress("server2"), - new ServerAddress("server3")), - connectAttempts); + // we don't care in which order the connects happened, as long as all hosts have been tried + assertEquals(new HashSet<>(asList(new ServerAddress("server1"), + new ServerAddress("server2"), + new ServerAddress("server3"))), + new HashSet<>(connectAttempts)); axonServerProxy.enable();