From e7cc68f5dacb7d26f5c51645bd9202c63b5c3949 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Tue, 18 Feb 2025 18:07:22 -0800 Subject: [PATCH] Address PR comments Signed-off-by: Rishabh Maurya --- CHANGELOG-3.0.md | 2 +- .../arrow/flight/ArrowFlightServerIT.java | 2 +- .../flight/bootstrap/FlightClientManager.java | 85 ++++++++----------- .../flight/bootstrap/ServerComponents.java | 8 +- .../TransportNodesFlightInfoActionTests.java | 16 ++-- .../bootstrap/FlightClientManagerTests.java | 41 +++------ 6 files changed, 65 insertions(+), 89 deletions(-) diff --git a/CHANGELOG-3.0.md b/CHANGELOG-3.0.md index 7f74bbafff8e5..971a785d9ddf6 100644 --- a/CHANGELOG-3.0.md +++ b/CHANGELOG-3.0.md @@ -16,7 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957)) - Added pull-based Ingestion (APIs, for ingestion source, a Kafka plugin, and IngestionEngine that pulls data from the ingestion source) ([#16958](https://github.com/opensearch-project/OpenSearch/pull/16958)) - Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223) -- Arrow Flight RPC plugin with Flight server bootstrap logic and client support for internode communication ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962)) +- Arrow Flight RPC plugin with Flight server bootstrap logic and client for internode communication support ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962)) ### Dependencies - Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366)) diff --git a/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java b/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java index 35dedde0bfc62..bcad335c7a917 100644 --- a/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java +++ b/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java @@ -50,7 +50,7 @@ public void setUp() throws Exception { public void testArrowFlightEndpoint() throws Exception { for (DiscoveryNode node : getClusterState().nodes()) { - try (FlightClient flightClient = flightClientManager.getFlightClient(node.getId())) { + try (FlightClient flightClient = flightClientManager.getFlightClient(node.getId()).get()) { assertNotNull(flightClient); flightClient.handshake(CallOptions.timeout(5000L, TimeUnit.MILLISECONDS)); } diff --git a/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java b/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java index a8565fd248327..a81033f580a03 100644 --- a/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java +++ b/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -58,7 +59,7 @@ public class FlightClientManager implements ClusterStateListener, AutoCloseable static final int LOCATION_TIMEOUT_MS = 1000; private final ExecutorService grpcExecutor; private final ClientConfiguration clientConfig; - private final Map flightClients = new ConcurrentHashMap<>(); + private final Map flightClients = new ConcurrentHashMap<>(); private final Client client; /** @@ -97,20 +98,8 @@ public FlightClientManager( * @param nodeId The ID of the node for which to retrieve the Flight client * @return An OpenSearchFlightClient instance for the specified node */ - public FlightClient getFlightClient(String nodeId) { - ClientHolder clientHolder = flightClients.getOrDefault(nodeId, null); - return clientHolder != null ? clientHolder.flightClient : null; - } - - /** - * Returns the location of a Flight client for a given node ID. - * - * @param nodeId The ID of the node for which to retrieve the location - * @return The Location of the Flight client for the specified node - */ - public Location getFlightClientLocation(String nodeId) { - ClientHolder clientHolder = flightClients.getOrDefault(nodeId, null); - return clientHolder != null ? clientHolder.location : null; + public Optional getFlightClient(String nodeId) { + return Optional.ofNullable(flightClients.get(nodeId)); } /** @@ -129,10 +118,6 @@ public void buildClientAsync(String nodeId) { requestNodeLocation(nodeId, locationFuture); } - Map getClients() { - return flightClients; - } - private void buildClientAndAddToPool(Location location, DiscoveryNode node) { if (!isValidNode(node)) { logger.warn( @@ -143,36 +128,40 @@ private void buildClientAndAddToPool(Location location, DiscoveryNode node) { ); return; } - FlightClient flightClient = buildClient(location); - flightClients.put(node.getId(), new ClientHolder(location, flightClient)); + flightClients.computeIfAbsent(node.getId(), key -> buildClient(location)); } private void requestNodeLocation(String nodeId, CompletableFuture future) { NodesFlightInfoRequest request = new NodesFlightInfoRequest(nodeId); - client.execute(NodesFlightInfoAction.INSTANCE, request, new ActionListener<>() { - @Override - public void onResponse(NodesFlightInfoResponse response) { - NodeFlightInfo nodeInfo = response.getNodesMap().get(nodeId); - if (nodeInfo != null) { - TransportAddress publishAddress = nodeInfo.getBoundAddress().publishAddress(); - String address = publishAddress.getAddress(); - int flightPort = publishAddress.address().getPort(); - Location location = clientConfig.sslContextProvider != null - ? Location.forGrpcTls(address, flightPort) - : Location.forGrpcInsecure(address, flightPort); - - future.complete(location); - } else { - future.completeExceptionally(new IllegalStateException("No Flight info received for node: [" + nodeId + "]")); + try { + + client.execute(NodesFlightInfoAction.INSTANCE, request, new ActionListener<>() { + @Override + public void onResponse(NodesFlightInfoResponse response) { + NodeFlightInfo nodeInfo = response.getNodesMap().get(nodeId); + if (nodeInfo != null) { + TransportAddress publishAddress = nodeInfo.getBoundAddress().publishAddress(); + String address = publishAddress.getAddress(); + int flightPort = publishAddress.address().getPort(); + Location location = clientConfig.sslContextProvider != null + ? Location.forGrpcTls(address, flightPort) + : Location.forGrpcInsecure(address, flightPort); + + future.complete(location); + } else { + future.completeExceptionally(new IllegalStateException("No Flight info received for node: [" + nodeId + "]")); + } } - } - @Override - public void onFailure(Exception e) { - future.completeExceptionally(e); - logger.error("Failed to get Flight server info for node: [{}] {}", nodeId, e); - } - }); + @Override + public void onFailure(Exception e) { + future.completeExceptionally(e); + logger.error("Failed to get Flight server info for node: [{}] {}", nodeId, e); + } + }); + } catch (final Exception ex) { + future.completeExceptionally(ex); + } } private FlightClient buildClient(Location location) { @@ -195,15 +184,13 @@ private DiscoveryNode getNodeFromClusterState(String nodeId) { */ @Override public void close() throws Exception { - for (ClientHolder clientHolder : flightClients.values()) { - clientHolder.flightClient.close(); + for (FlightClient flightClient : flightClients.values()) { + flightClient.close(); } flightClients.clear(); grpcExecutor.shutdown(); grpcExecutor.awaitTermination(5, TimeUnit.SECONDS); - } - - private record ClientHolder(Location location, FlightClient flightClient) { + clientConfig.clusterService.removeListener(this); } /** @@ -242,7 +229,7 @@ private Set getCurrentClusterNodes() { } @VisibleForTesting - Map getFlightClients() { + Map getFlightClients() { return flightClients; } diff --git a/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerComponents.java b/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerComponents.java index 93970c04d24a6..06b8b6bd4d35c 100644 --- a/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerComponents.java +++ b/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerComponents.java @@ -56,28 +56,28 @@ final class ServerComponents implements AutoCloseable { public static final Setting> SETTING_FLIGHT_HOST = listSetting( - "flight.host", + "arrow.flight.host", emptyList(), Function.identity(), Setting.Property.NodeScope ); public static final Setting> SETTING_FLIGHT_BIND_HOST = listSetting( - "flight.bind_host", + "arrow.flight.bind_host", SETTING_FLIGHT_HOST, Function.identity(), Setting.Property.NodeScope ); public static final Setting> SETTING_FLIGHT_PUBLISH_HOST = listSetting( - "flight.publish_host", + "arrow.flight.publish_host", SETTING_FLIGHT_HOST, Function.identity(), Setting.Property.NodeScope ); public static final Setting SETTING_FLIGHT_PUBLISH_PORT = intSetting( - "flight.publish_port", + "arrow.flight.publish_port", -1, -1, Setting.Property.NodeScope diff --git a/plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/api/TransportNodesFlightInfoActionTests.java b/plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/api/TransportNodesFlightInfoActionTests.java index 615c3905b135a..d9d8af5920d61 100644 --- a/plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/api/TransportNodesFlightInfoActionTests.java +++ b/plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/api/TransportNodesFlightInfoActionTests.java @@ -1,10 +1,12 @@ -package org.opensearch.arrow.flight.api;/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.arrow.flight.api; import org.opensearch.Version; import org.opensearch.action.FailedNodeException; diff --git a/plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightClientManagerTests.java b/plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightClientManagerTests.java index b722813b7e394..acc32d6b32f4c 100644 --- a/plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightClientManagerTests.java +++ b/plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightClientManagerTests.java @@ -8,7 +8,6 @@ package org.opensearch.arrow.flight.bootstrap; import org.apache.arrow.flight.FlightClient; -import org.apache.arrow.flight.Location; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.opensearch.Version; @@ -106,9 +105,9 @@ public void setUp() throws Exception { ClusterChangedEvent event = new ClusterChangedEvent("test", state, ClusterState.EMPTY_STATE); clientManager.clusterChanged(event); assertBusy(() -> { - assertEquals("Flight client isn't built in time limit", 2, clientManager.getClients().size()); - assertNotNull("local_node should exist", clientManager.getFlightClient("local_node")); - assertNotNull("remote_node should exist", clientManager.getFlightClient("remote_node")); + assertEquals("Flight client isn't built in time limit", 2, clientManager.getFlightClients().size()); + assertNotNull("local_node should exist", clientManager.getFlightClient("local_node").get()); + assertNotNull("remote_node should exist", clientManager.getFlightClient("remote_node").get()); }, 2, TimeUnit.SECONDS); } @@ -183,16 +182,8 @@ public void testGetFlightClientForExistingNode() { validateNodes(); } - public void testGetFlightClientLocation() { - for (DiscoveryNode node : state.nodes()) { - Location location = clientManager.getFlightClientLocation(node.getId()); - assertNotNull("Flight client location should be returned", location); - assertEquals("Location host should match", node.getHostAddress(), location.getUri().getHost()); - } - } - public void testGetFlightClientForNonExistentNode() throws Exception { - assertNull(clientManager.getFlightClient("non_existent_node")); + assertFalse(clientManager.getFlightClient("non_existent_node").isPresent()); } public void testClusterChangedWithNodesChanged() throws Exception { @@ -214,7 +205,7 @@ public void testClusterChangedWithNodesChanged() throws Exception { for (DiscoveryNode node : newState.nodes()) { assertBusy( - () -> { assertNotNull("Flight client isn't built in time limit", clientManager.getFlightClient(node.getId())); }, + () -> { assertTrue("Flight client isn't built in time limit", clientManager.getFlightClient(node.getId()).isPresent()); }, 2, TimeUnit.SECONDS ); @@ -227,7 +218,7 @@ public void testClusterChangedWithNoNodesChanged() throws Exception { // Verify original client still exists for (DiscoveryNode node : state.nodes()) { - assertNotNull(clientManager.getFlightClient(node.getId())); + assertNotNull(clientManager.getFlightClient(node.getId()).get()); } } @@ -237,7 +228,7 @@ public void testGetLocalNodeId() throws Exception { public void testCloseWithActiveClients() throws Exception { for (DiscoveryNode node : state.nodes()) { - FlightClient client = clientManager.getFlightClient(node.getId()); + FlightClient client = clientManager.getFlightClient(node.getId()).get(); assertNotNull(client); } @@ -266,7 +257,7 @@ public void testIncompatibleNodeVersion() throws Exception { when(clusterService.state()).thenReturn(oldVersionState); mockFlightInfoResponse(nodes, 0); - assertNull(clientManager.getFlightClient(oldVersionNode.getId())); + assertFalse(clientManager.getFlightClient(oldVersionNode.getId()).isPresent()); } public void testGetFlightClientLocationTimeout() throws Exception { @@ -286,7 +277,7 @@ public void testGetFlightClientLocationTimeout() throws Exception { ClusterChangedEvent event = new ClusterChangedEvent("test", newState, ClusterState.EMPTY_STATE); clientManager.clusterChanged(event); - assertNull(clientManager.getFlightClient(nodeId)); + assertFalse(clientManager.getFlightClient(nodeId).isPresent()); } public void testGetFlightClientLocationExecutionError() throws Exception { @@ -313,7 +304,7 @@ public void testGetFlightClientLocationExecutionError() throws Exception { ClusterChangedEvent event = new ClusterChangedEvent("test", newState, ClusterState.EMPTY_STATE); clientManager.clusterChanged(event); - assertNull(clientManager.getFlightClient(nodeId)); + assertFalse(clientManager.getFlightClient(nodeId).isPresent()); } public void testFailedClusterUpdateButSuccessfulDirectRequest() throws Exception { @@ -374,19 +365,15 @@ public void testFailedClusterUpdateButSuccessfulDirectRequest() throws Exception // Verify that the client can still be created successfully on direct request clientManager.buildClientAsync(nodeId); - assertBusy( - () -> { - assertNotNull("Flight client should be created successfully on direct request", clientManager.getFlightClient(nodeId)); - }, - 2, - TimeUnit.SECONDS - ); + assertBusy(() -> { + assertTrue("Flight client should be created successfully on direct request", clientManager.getFlightClient(nodeId).isPresent()); + }, 2, TimeUnit.SECONDS); assertFalse("first call should be invoked", firstCall.get()); } private void validateNodes() { for (DiscoveryNode node : state.nodes()) { - FlightClient client = clientManager.getFlightClient(node.getId()); + FlightClient client = clientManager.getFlightClient(node.getId()).get(); assertNotNull("Flight client should be created for existing node", client); } }