Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
  • Loading branch information
rishabhmaurya committed Feb 19, 2025
1 parent 4b23dd6 commit e7cc68f
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 89 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG-3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957))
- Added pull-based Ingestion (APIs, for ingestion source, a Kafka plugin, and IngestionEngine that pulls data from the ingestion source) ([#16958](https://github.com/opensearch-project/OpenSearch/pull/16958))
- Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223)
- Arrow Flight RPC plugin with Flight server bootstrap logic and client support for internode communication ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962))
- Arrow Flight RPC plugin with Flight server bootstrap logic and client for internode communication support ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962))

### Dependencies
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void setUp() throws Exception {

public void testArrowFlightEndpoint() throws Exception {
for (DiscoveryNode node : getClusterState().nodes()) {
try (FlightClient flightClient = flightClientManager.getFlightClient(node.getId())) {
try (FlightClient flightClient = flightClientManager.getFlightClient(node.getId()).get()) {
assertNotNull(flightClient);
flightClient.handshake(CallOptions.timeout(5000L, TimeUnit.MILLISECONDS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -58,7 +59,7 @@ public class FlightClientManager implements ClusterStateListener, AutoCloseable
static final int LOCATION_TIMEOUT_MS = 1000;
private final ExecutorService grpcExecutor;
private final ClientConfiguration clientConfig;
private final Map<String, ClientHolder> flightClients = new ConcurrentHashMap<>();
private final Map<String, FlightClient> flightClients = new ConcurrentHashMap<>();
private final Client client;

/**
Expand Down Expand Up @@ -97,20 +98,8 @@ public FlightClientManager(
* @param nodeId The ID of the node for which to retrieve the Flight client
* @return An OpenSearchFlightClient instance for the specified node
*/
public FlightClient getFlightClient(String nodeId) {
ClientHolder clientHolder = flightClients.getOrDefault(nodeId, null);
return clientHolder != null ? clientHolder.flightClient : null;
}

/**
* Returns the location of a Flight client for a given node ID.
*
* @param nodeId The ID of the node for which to retrieve the location
* @return The Location of the Flight client for the specified node
*/
public Location getFlightClientLocation(String nodeId) {
ClientHolder clientHolder = flightClients.getOrDefault(nodeId, null);
return clientHolder != null ? clientHolder.location : null;
public Optional<FlightClient> getFlightClient(String nodeId) {
return Optional.ofNullable(flightClients.get(nodeId));
}

/**
Expand All @@ -129,10 +118,6 @@ public void buildClientAsync(String nodeId) {
requestNodeLocation(nodeId, locationFuture);
}

Map<String, ClientHolder> getClients() {
return flightClients;
}

private void buildClientAndAddToPool(Location location, DiscoveryNode node) {
if (!isValidNode(node)) {
logger.warn(
Expand All @@ -143,36 +128,40 @@ private void buildClientAndAddToPool(Location location, DiscoveryNode node) {
);
return;
}
FlightClient flightClient = buildClient(location);
flightClients.put(node.getId(), new ClientHolder(location, flightClient));
flightClients.computeIfAbsent(node.getId(), key -> buildClient(location));
}

private void requestNodeLocation(String nodeId, CompletableFuture<Location> future) {
NodesFlightInfoRequest request = new NodesFlightInfoRequest(nodeId);
client.execute(NodesFlightInfoAction.INSTANCE, request, new ActionListener<>() {
@Override
public void onResponse(NodesFlightInfoResponse response) {
NodeFlightInfo nodeInfo = response.getNodesMap().get(nodeId);
if (nodeInfo != null) {
TransportAddress publishAddress = nodeInfo.getBoundAddress().publishAddress();
String address = publishAddress.getAddress();
int flightPort = publishAddress.address().getPort();
Location location = clientConfig.sslContextProvider != null
? Location.forGrpcTls(address, flightPort)
: Location.forGrpcInsecure(address, flightPort);

future.complete(location);
} else {
future.completeExceptionally(new IllegalStateException("No Flight info received for node: [" + nodeId + "]"));
try {

client.execute(NodesFlightInfoAction.INSTANCE, request, new ActionListener<>() {
@Override
public void onResponse(NodesFlightInfoResponse response) {
NodeFlightInfo nodeInfo = response.getNodesMap().get(nodeId);
if (nodeInfo != null) {
TransportAddress publishAddress = nodeInfo.getBoundAddress().publishAddress();
String address = publishAddress.getAddress();
int flightPort = publishAddress.address().getPort();
Location location = clientConfig.sslContextProvider != null
? Location.forGrpcTls(address, flightPort)

Check warning on line 147 in plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java

View check run for this annotation

Codecov / codecov/patch

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java#L147

Added line #L147 was not covered by tests
: Location.forGrpcInsecure(address, flightPort);

future.complete(location);
} else {
future.completeExceptionally(new IllegalStateException("No Flight info received for node: [" + nodeId + "]"));

Check warning on line 152 in plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java

View check run for this annotation

Codecov / codecov/patch

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java#L152

Added line #L152 was not covered by tests
}
}
}

@Override
public void onFailure(Exception e) {
future.completeExceptionally(e);
logger.error("Failed to get Flight server info for node: [{}] {}", nodeId, e);
}
});
@Override
public void onFailure(Exception e) {
future.completeExceptionally(e);
logger.error("Failed to get Flight server info for node: [{}] {}", nodeId, e);
}
});
} catch (final Exception ex) {
future.completeExceptionally(ex);

Check warning on line 163 in plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java

View check run for this annotation

Codecov / codecov/patch

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java#L162-L163

Added lines #L162 - L163 were not covered by tests
}
}

private FlightClient buildClient(Location location) {
Expand All @@ -195,15 +184,13 @@ private DiscoveryNode getNodeFromClusterState(String nodeId) {
*/
@Override
public void close() throws Exception {
for (ClientHolder clientHolder : flightClients.values()) {
clientHolder.flightClient.close();
for (FlightClient flightClient : flightClients.values()) {
flightClient.close();
}
flightClients.clear();
grpcExecutor.shutdown();
grpcExecutor.awaitTermination(5, TimeUnit.SECONDS);
}

private record ClientHolder(Location location, FlightClient flightClient) {
clientConfig.clusterService.removeListener(this);
}

/**
Expand Down Expand Up @@ -242,7 +229,7 @@ private Set<String> getCurrentClusterNodes() {
}

@VisibleForTesting
Map<String, ClientHolder> getFlightClients() {
Map<String, FlightClient> getFlightClients() {
return flightClients;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,28 @@
final class ServerComponents implements AutoCloseable {

public static final Setting<List<String>> SETTING_FLIGHT_HOST = listSetting(
"flight.host",
"arrow.flight.host",
emptyList(),
Function.identity(),
Setting.Property.NodeScope
);

public static final Setting<List<String>> SETTING_FLIGHT_BIND_HOST = listSetting(
"flight.bind_host",
"arrow.flight.bind_host",
SETTING_FLIGHT_HOST,
Function.identity(),
Setting.Property.NodeScope
);

public static final Setting<List<String>> SETTING_FLIGHT_PUBLISH_HOST = listSetting(
"flight.publish_host",
"arrow.flight.publish_host",
SETTING_FLIGHT_HOST,
Function.identity(),
Setting.Property.NodeScope
);

public static final Setting<Integer> SETTING_FLIGHT_PUBLISH_PORT = intSetting(
"flight.publish_port",
"arrow.flight.publish_port",
-1,
-1,
Setting.Property.NodeScope
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.opensearch.arrow.flight.api;/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.arrow.flight.api;

import org.opensearch.Version;
import org.opensearch.action.FailedNodeException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.opensearch.arrow.flight.bootstrap;

import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.opensearch.Version;
Expand Down Expand Up @@ -106,9 +105,9 @@ public void setUp() throws Exception {
ClusterChangedEvent event = new ClusterChangedEvent("test", state, ClusterState.EMPTY_STATE);
clientManager.clusterChanged(event);
assertBusy(() -> {
assertEquals("Flight client isn't built in time limit", 2, clientManager.getClients().size());
assertNotNull("local_node should exist", clientManager.getFlightClient("local_node"));
assertNotNull("remote_node should exist", clientManager.getFlightClient("remote_node"));
assertEquals("Flight client isn't built in time limit", 2, clientManager.getFlightClients().size());
assertNotNull("local_node should exist", clientManager.getFlightClient("local_node").get());
assertNotNull("remote_node should exist", clientManager.getFlightClient("remote_node").get());
}, 2, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -183,16 +182,8 @@ public void testGetFlightClientForExistingNode() {
validateNodes();
}

public void testGetFlightClientLocation() {
for (DiscoveryNode node : state.nodes()) {
Location location = clientManager.getFlightClientLocation(node.getId());
assertNotNull("Flight client location should be returned", location);
assertEquals("Location host should match", node.getHostAddress(), location.getUri().getHost());
}
}

public void testGetFlightClientForNonExistentNode() throws Exception {
assertNull(clientManager.getFlightClient("non_existent_node"));
assertFalse(clientManager.getFlightClient("non_existent_node").isPresent());
}

public void testClusterChangedWithNodesChanged() throws Exception {
Expand All @@ -214,7 +205,7 @@ public void testClusterChangedWithNodesChanged() throws Exception {

for (DiscoveryNode node : newState.nodes()) {
assertBusy(
() -> { assertNotNull("Flight client isn't built in time limit", clientManager.getFlightClient(node.getId())); },
() -> { assertTrue("Flight client isn't built in time limit", clientManager.getFlightClient(node.getId()).isPresent()); },
2,
TimeUnit.SECONDS
);
Expand All @@ -227,7 +218,7 @@ public void testClusterChangedWithNoNodesChanged() throws Exception {

// Verify original client still exists
for (DiscoveryNode node : state.nodes()) {
assertNotNull(clientManager.getFlightClient(node.getId()));
assertNotNull(clientManager.getFlightClient(node.getId()).get());
}
}

Expand All @@ -237,7 +228,7 @@ public void testGetLocalNodeId() throws Exception {

public void testCloseWithActiveClients() throws Exception {
for (DiscoveryNode node : state.nodes()) {
FlightClient client = clientManager.getFlightClient(node.getId());
FlightClient client = clientManager.getFlightClient(node.getId()).get();
assertNotNull(client);
}

Expand Down Expand Up @@ -266,7 +257,7 @@ public void testIncompatibleNodeVersion() throws Exception {
when(clusterService.state()).thenReturn(oldVersionState);
mockFlightInfoResponse(nodes, 0);

assertNull(clientManager.getFlightClient(oldVersionNode.getId()));
assertFalse(clientManager.getFlightClient(oldVersionNode.getId()).isPresent());
}

public void testGetFlightClientLocationTimeout() throws Exception {
Expand All @@ -286,7 +277,7 @@ public void testGetFlightClientLocationTimeout() throws Exception {

ClusterChangedEvent event = new ClusterChangedEvent("test", newState, ClusterState.EMPTY_STATE);
clientManager.clusterChanged(event);
assertNull(clientManager.getFlightClient(nodeId));
assertFalse(clientManager.getFlightClient(nodeId).isPresent());
}

public void testGetFlightClientLocationExecutionError() throws Exception {
Expand All @@ -313,7 +304,7 @@ public void testGetFlightClientLocationExecutionError() throws Exception {
ClusterChangedEvent event = new ClusterChangedEvent("test", newState, ClusterState.EMPTY_STATE);
clientManager.clusterChanged(event);

assertNull(clientManager.getFlightClient(nodeId));
assertFalse(clientManager.getFlightClient(nodeId).isPresent());
}

public void testFailedClusterUpdateButSuccessfulDirectRequest() throws Exception {
Expand Down Expand Up @@ -374,19 +365,15 @@ public void testFailedClusterUpdateButSuccessfulDirectRequest() throws Exception

// Verify that the client can still be created successfully on direct request
clientManager.buildClientAsync(nodeId);
assertBusy(
() -> {
assertNotNull("Flight client should be created successfully on direct request", clientManager.getFlightClient(nodeId));
},
2,
TimeUnit.SECONDS
);
assertBusy(() -> {
assertTrue("Flight client should be created successfully on direct request", clientManager.getFlightClient(nodeId).isPresent());
}, 2, TimeUnit.SECONDS);
assertFalse("first call should be invoked", firstCall.get());
}

private void validateNodes() {
for (DiscoveryNode node : state.nodes()) {
FlightClient client = clientManager.getFlightClient(node.getId());
FlightClient client = clientManager.getFlightClient(node.getId()).get();
assertNotNull("Flight client should be created for existing node", client);
}
}
Expand Down

0 comments on commit e7cc68f

Please sign in to comment.