diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index 941f749ac17d1..b211d81c1c76a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -13,7 +13,6 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.Writeable; -import org.opensearch.common.unit.TimeValue; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; @@ -42,6 +41,7 @@ public class PrimaryShardReplicationSource implements SegmentReplicationSource { private final DiscoveryNode sourceNode; private final DiscoveryNode targetNode; private final String targetAllocationId; + private final RecoverySettings recoverySettings; public PrimaryShardReplicationSource( DiscoveryNode targetNode, @@ -59,6 +59,7 @@ public PrimaryShardReplicationSource( ); this.sourceNode = sourceNode; this.targetNode = targetNode; + this.recoverySettings = recoverySettings; } @Override @@ -83,17 +84,6 @@ public void getSegmentFiles( ) { final Writeable.Reader reader = GetSegmentFilesResponse::new; final ActionListener responseListener = ActionListener.map(listener, r -> r); - // Few of the below assumptions and calculations are added for experimental release of segment replication feature in 2.3 - // version. These will be changed in next release. - - // Storing the size of files to fetch in bytes. - final long sizeOfSegmentFiles = filesToFetch.stream().mapToLong(file -> file.length()).sum(); - - // Maximum size of files to fetch (segment files) in bytes, that can be processed in 1 minute for a m5.xlarge machine. - long baseSegmentFilesSize = 100000000; - - // Formula for calculating time needed to process a replication event's files to fetch process - final long timeToGetSegmentFiles = 1 + (sizeOfSegmentFiles / baseSegmentFilesSize); final GetSegmentFilesRequest request = new GetSegmentFilesRequest( replicationId, targetAllocationId, @@ -102,7 +92,7 @@ public void getSegmentFiles( checkpoint ); final TransportRequestOptions options = TransportRequestOptions.builder() - .withTimeout(TimeValue.timeValueMinutes(timeToGetSegmentFiles)) + .withTimeout(recoverySettings.internalActionLongTimeout()) .build(); transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, options, responseListener, reader); } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index ec8d5bcf1c687..73c934c0e90b7 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -67,6 +67,7 @@ import org.opensearch.transport.TestTransportChannel; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; import org.junit.After; @@ -218,6 +219,17 @@ protected void onSendRequest(long requestId, String action, TransportRequest req super.onSendRequest(requestId, action, request, destination); } } + + @Override + protected void onSendRequest( + long requestId, + String action, + TransportRequest request, + DiscoveryNode destination, + TransportRequestOptions options + ) { + onSendRequest(requestId, action, request, destination); + } }; final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); TransportService transportService = capturingTransport.createTransportService( diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java index 323445bee1274..b904a29f10f11 100644 --- a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -50,6 +50,8 @@ public class PrimaryShardReplicationSourceTests extends IndexShardTestCase { private IndexShard indexShard; private DiscoveryNode sourceNode; + private RecoverySettings recoverySettings; + @Override public void setUp() throws Exception { super.setUp(); @@ -73,6 +75,7 @@ public void setUp() throws Exception { indexShard = newStartedShard(true); + this.recoverySettings = recoverySettings; replicationSource = new PrimaryShardReplicationSource( localNode, indexShard.routingEntry().allocationId().toString(), @@ -130,6 +133,34 @@ public void testGetSegmentFiles() { assertTrue(capturedRequest.request instanceof GetSegmentFilesRequest); } + /** + * This test verifies the transport request timeout value for fetching the segment files. + */ + public void testTransportTimeoutForGetSegmentFilesAction() { + long fileSize = (long) (Math.pow(10, 9)); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + SEQ_NO, + VERSION + ); + StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", fileSize, "checksum", Version.LATEST); + replicationSource.getSegmentFiles( + REPLICATION_ID, + checkpoint, + Arrays.asList(testMetadata), + mock(Store.class), + mock(ActionListener.class) + ); + CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); + assertEquals(1, requestList.length); + CapturingTransport.CapturedRequest capturedRequest = requestList[0]; + assertEquals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, capturedRequest.action); + assertEquals(sourceNode, capturedRequest.node); + assertEquals(recoverySettings.internalActionLongTimeout(), capturedRequest.options.timeout()); + } + public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( diff --git a/test/framework/src/main/java/org/opensearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/opensearch/test/transport/CapturingTransport.java index f49f500d1431c..5e2c724e4e1e3 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/CapturingTransport.java @@ -36,6 +36,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestOptions; import java.util.ArrayList; import java.util.Collection; @@ -54,12 +55,14 @@ public static class CapturedRequest { public final long requestId; public final String action; public final TransportRequest request; + public final TransportRequestOptions options; - CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request) { + CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) { this.node = node; this.requestId = requestId; this.action = action; this.request = request; + this.options = options; } } @@ -123,6 +126,16 @@ public void clear() { } protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { - capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request)); + capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request, null)); + } + + protected void onSendRequest( + long requestId, + String action, + TransportRequest request, + DiscoveryNode node, + TransportRequestOptions options + ) { + capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request, options)); } } diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java index e1e5bcc968047..5bfc8879ecbc6 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java @@ -188,13 +188,23 @@ public DiscoveryNode getNode() { public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws TransportException { requests.put(requestId, Tuple.tuple(node, action)); - onSendRequest(requestId, action, request, node); + onSendRequest(requestId, action, request, node, options); } }; } protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {} + protected void onSendRequest( + long requestId, + String action, + TransportRequest request, + DiscoveryNode node, + TransportRequestOptions options + ) { + onSendRequest(requestId, action, request, node); + } + @Override public void setMessageListener(TransportMessageListener listener) { if (this.listener != null) {