Skip to content

Commit

Permalink
[Segment Replication] Add new segrep settings (opensearch-project#6523)
Browse files Browse the repository at this point in the history
* [Segment Replication] Add segrep settings

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Spotless fix

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

---------

Signed-off-by: Suraj Singh <surajrider@gmail.com>
Signed-off-by: Mingshi Liu <mingshl@amazon.com>
  • Loading branch information
dreamer-89 authored and mingshl committed Mar 24, 2023
1 parent f1b9b88 commit 160a1c1
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -42,6 +41,7 @@ public class PrimaryShardReplicationSource implements SegmentReplicationSource {
private final DiscoveryNode sourceNode;
private final DiscoveryNode targetNode;
private final String targetAllocationId;
private final RecoverySettings recoverySettings;

public PrimaryShardReplicationSource(
DiscoveryNode targetNode,
Expand All @@ -59,6 +59,7 @@ public PrimaryShardReplicationSource(
);
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.recoverySettings = recoverySettings;
}

@Override
Expand All @@ -83,17 +84,6 @@ public void getSegmentFiles(
) {
final Writeable.Reader<GetSegmentFilesResponse> reader = GetSegmentFilesResponse::new;
final ActionListener<GetSegmentFilesResponse> responseListener = ActionListener.map(listener, r -> r);
// Few of the below assumptions and calculations are added for experimental release of segment replication feature in 2.3
// version. These will be changed in next release.

// Storing the size of files to fetch in bytes.
final long sizeOfSegmentFiles = filesToFetch.stream().mapToLong(file -> file.length()).sum();

// Maximum size of files to fetch (segment files) in bytes, that can be processed in 1 minute for a m5.xlarge machine.
long baseSegmentFilesSize = 100000000;

// Formula for calculating time needed to process a replication event's files to fetch process
final long timeToGetSegmentFiles = 1 + (sizeOfSegmentFiles / baseSegmentFilesSize);
final GetSegmentFilesRequest request = new GetSegmentFilesRequest(
replicationId,
targetAllocationId,
Expand All @@ -102,7 +92,7 @@ public void getSegmentFiles(
checkpoint
);
final TransportRequestOptions options = TransportRequestOptions.builder()
.withTimeout(TimeValue.timeValueMinutes(timeToGetSegmentFiles))
.withTimeout(recoverySettings.internalActionLongTimeout())
.build();
transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, options, responseListener, reader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.transport.TestTransportChannel;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportService;
import org.junit.After;
Expand Down Expand Up @@ -218,6 +219,17 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
super.onSendRequest(requestId, action, request, destination);
}
}

@Override
protected void onSendRequest(
long requestId,
String action,
TransportRequest request,
DiscoveryNode destination,
TransportRequestOptions options
) {
onSendRequest(requestId, action, request, destination);
}
};
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
TransportService transportService = capturingTransport.createTransportService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class PrimaryShardReplicationSourceTests extends IndexShardTestCase {
private IndexShard indexShard;
private DiscoveryNode sourceNode;

private RecoverySettings recoverySettings;

@Override
public void setUp() throws Exception {
super.setUp();
Expand All @@ -73,6 +75,7 @@ public void setUp() throws Exception {

indexShard = newStartedShard(true);

this.recoverySettings = recoverySettings;
replicationSource = new PrimaryShardReplicationSource(
localNode,
indexShard.routingEntry().allocationId().toString(),
Expand Down Expand Up @@ -130,6 +133,34 @@ public void testGetSegmentFiles() {
assertTrue(capturedRequest.request instanceof GetSegmentFilesRequest);
}

/**
* This test verifies the transport request timeout value for fetching the segment files.
*/
public void testTransportTimeoutForGetSegmentFilesAction() {
long fileSize = (long) (Math.pow(10, 9));
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
SEQ_NO,
VERSION
);
StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", fileSize, "checksum", Version.LATEST);
replicationSource.getSegmentFiles(
REPLICATION_ID,
checkpoint,
Arrays.asList(testMetadata),
mock(Store.class),
mock(ActionListener.class)
);
CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear();
assertEquals(1, requestList.length);
CapturingTransport.CapturedRequest capturedRequest = requestList[0];
assertEquals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, capturedRequest.action);
assertEquals(sourceNode, capturedRequest.node);
assertEquals(recoverySettings.internalActionLongTimeout(), capturedRequest.options.timeout());
}

public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -54,12 +55,14 @@ public static class CapturedRequest {
public final long requestId;
public final String action;
public final TransportRequest request;
public final TransportRequestOptions options;

CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request) {
CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) {
this.node = node;
this.requestId = requestId;
this.action = action;
this.request = request;
this.options = options;
}
}

Expand Down Expand Up @@ -123,6 +126,16 @@ public void clear() {
}

protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request));
capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request, null));
}

protected void onSendRequest(
long requestId,
String action,
TransportRequest request,
DiscoveryNode node,
TransportRequestOptions options
) {
capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request, options));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,23 @@ public DiscoveryNode getNode() {
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
requests.put(requestId, Tuple.tuple(node, action));
onSendRequest(requestId, action, request, node);
onSendRequest(requestId, action, request, node, options);
}
};
}

protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {}

protected void onSendRequest(
long requestId,
String action,
TransportRequest request,
DiscoveryNode node,
TransportRequestOptions options
) {
onSendRequest(requestId, action, request, node);
}

@Override
public void setMessageListener(TransportMessageListener listener) {
if (this.listener != null) {
Expand Down

0 comments on commit 160a1c1

Please sign in to comment.