Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize indexing performance in replica shard #17371

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Convert transport-reactor-netty4 to use gradle version catalog [#17233](https://github.com/opensearch-project/OpenSearch/pull/17233)
- Increase force merge threads to 1/8th of cores [#17255](https://github.com/opensearch-project/OpenSearch/pull/17255)
- Optimize indexing performance in replica shard [#17371](https://github.com/opensearch-project/OpenSearch/pull/17371)

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.support.XContentMapValues;
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.core.rest.RestStatus;
Expand Down Expand Up @@ -465,6 +465,90 @@ public void testSyncedFlushTransition() throws Exception {
}
}

public void testReplicasUsePrimaryIndexingStrategy() throws Exception {
Nodes nodes = buildNodeAndVersions();
logger.info("cluster discovered:\n {}", nodes.toString());
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1m")
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2);
final String index = "test-index";
createIndex(index, settings.build());
ensureNoInitializingShards(); // wait for all other shard activity to finish
ensureGreen(index);

int docCount = 200;
try (RestClient nodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
indexDocs(index, 0, docCount);

Thread[] indexThreads = new Thread[5];
for (int i = 0; i < indexThreads.length; i++) {
indexThreads[i] = new Thread(() -> {
try {
int idStart = randomInt(docCount / 2);
indexDocs(index, idStart, idStart + docCount / 2);
if (randomBoolean()) {
// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));
}
} catch (IOException e) {
throw new AssertionError("failed while indexing [" + e.getMessage() + "]");
}
});
indexThreads[i].start();
}
for (Thread indexThread : indexThreads) {
indexThread.join();
}
if (randomBoolean()) {
// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));
}
// verify replica catch up with primary
assertSeqNoOnShards(index, nodes, docCount, nodeClient);
assertSourceEqualWithPrimary(index, docCount);
}
}

private void assertSourceEqualWithPrimary(final String index, final int expectedCount) throws IOException {
Request primaryRequest = new Request("GET", index + "/_search");
primaryRequest.addParameter("preference", "_primary");
primaryRequest.addParameter("size", String.valueOf(expectedCount+100));
final Response primaryResponse = client().performRequest(primaryRequest);

Map<String, Object> primaryHits = ObjectPath.createFromResponse(primaryResponse).evaluate("hits");
Map<String, Object> totals = ObjectPath.evaluate(primaryHits, "total");
assertEquals(expectedCount, totals.get("values"));

List<Object> primarySources = ObjectPath.evaluate(primaryHits, "hits");
assertEquals(expectedCount, primarySources.size());

Map<String, Object> primarys = new HashMap<>(expectedCount);
for (int i = 0; i < primarySources.size(); i++) {
primarys.put(ObjectPath.evaluate(primarySources.get(i), "_id"), primarySources.get(i));
}


// replicas source
Request replicaRequest = new Request("GET", index + "/_search");
replicaRequest.addParameter("preference", "_replica");
replicaRequest.addParameter("size", String.valueOf(expectedCount+100));
final Response replicaResponse = client().performRequest(replicaRequest);

Map<String, Object> replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits");
Map<String, Object> replicaTotals = ObjectPath.evaluate(primaryHits, "total");
assertEquals(expectedCount, replicaTotals.get("values"));

List<Object> replicaSources = ObjectPath.evaluate(replicaHits, "hits");
assertEquals(expectedCount, replicaSources.size());

for (Object replicaSource : replicaSources) {
String id = ObjectPath.evaluate(replicaSource, "_id").toString();
assertEquals(primarys.get(id), replicaSource);
}
}

private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
Request request = new Request("GET", index + "/_count");
request.addParameter("preference", preference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;

Expand Down Expand Up @@ -392,6 +393,10 @@ protected static void parseInnerToXContent(XContentParser parser, Builder contex
}
}

public InternalEngine.WriteStrategy writeStrategy() {
return null;
};

/**
* Base class of all {@link DocWriteResponse} builders. These {@link DocWriteResponse.Builder} are used during
* xcontent parsing to temporarily store the parsed values, then the {@link Builder#build()} method is called to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ public void markOperationAsExecuted(Engine.Result result) {
result.getSeqNo(),
result.getTerm(),
indexResult.getVersion(),
indexResult.isCreated()
indexResult.isCreated(),
indexResult.indexingStrategy()
);
} else if (result.getOperationType() == Engine.Operation.TYPE.DELETE) {
Engine.DeleteResult deleteResult = (Engine.DeleteResult) result;
Expand All @@ -286,7 +287,8 @@ public void markOperationAsExecuted(Engine.Result result) {
deleteResult.getSeqNo(),
result.getTerm(),
deleteResult.getVersion(),
deleteResult.isFound()
deleteResult.isFound(),
deleteResult.deletionStrategy()
);

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.get.GetResult;
import org.opensearch.index.mapper.MapperException;
Expand Down Expand Up @@ -751,7 +752,8 @@ static BulkItemResponse processUpdateResponse(
indexResponse.getSeqNo(),
indexResponse.getPrimaryTerm(),
indexResponse.getVersion(),
indexResponse.getResult()
indexResponse.getResult(),
indexResponse.writeStrategy()
);

if (updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) {
Expand Down Expand Up @@ -783,7 +785,8 @@ static BulkItemResponse processUpdateResponse(
deleteResponse.getSeqNo(),
deleteResponse.getPrimaryTerm(),
deleteResponse.getVersion(),
deleteResponse.getResult()
deleteResponse.getResult(),
deleteResponse.writeStrategy()
);

final GetResult getResult = UpdateHelper.extractGetResult(
Expand Down Expand Up @@ -880,7 +883,8 @@ private static Engine.Result performOpOnReplica(
primaryResponse.getVersion(),
indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry(),
sourceToParse
sourceToParse,
(InternalEngine.IndexingStrategy) primaryResponse.writeStrategy()
);
break;
case DELETE:
Expand All @@ -889,7 +893,8 @@ private static Engine.Result performOpOnReplica(
primaryResponse.getSeqNo(),
primaryResponse.getPrimaryTerm(),
primaryResponse.getVersion(),
deleteRequest.id()
deleteRequest.id(),
(InternalEngine.DeletionStrategy) primaryResponse.writeStrategy()
);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@
import org.opensearch.action.DocWriteResponse;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.transport.client.Client;

import java.io.IOException;

import static org.opensearch.Version.V_3_0_0;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
Expand All @@ -54,21 +57,53 @@
*/
@PublicApi(since = "1.0.0")
public class DeleteResponse extends DocWriteResponse {
private final InternalEngine.DeletionStrategy deletionStrategy;

public DeleteResponse(ShardId shardId, StreamInput in) throws IOException {
super(shardId, in);
if (in.getVersion().onOrAfter(V_3_0_0) && in.readBoolean()) {
this.deletionStrategy = new InternalEngine.DeletionStrategy(in);
} else {
this.deletionStrategy = null;
}
}

public DeleteResponse(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(V_3_0_0) && in.readBoolean()) {
this.deletionStrategy = new InternalEngine.DeletionStrategy(in);
} else {
this.deletionStrategy = null;
}
}

public DeleteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, boolean found) {
this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND);
this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND, null);
}

private DeleteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) {
public DeleteResponse(
ShardId shardId,
String id,
long seqNo,
long primaryTerm,
long version,
boolean found,
InternalEngine.DeletionStrategy deletionStrategy
) {
this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND, deletionStrategy);
}

private DeleteResponse(
ShardId shardId,
String id,
long seqNo,
long primaryTerm,
long version,
Result result,
InternalEngine.DeletionStrategy deletionStrategy
) {
super(shardId, id, seqNo, primaryTerm, version, assertDeletedOrNotFound(result));
this.deletionStrategy = deletionStrategy;
}

private static Result assertDeletedOrNotFound(Result result) {
Expand All @@ -93,6 +128,37 @@ public String toString() {
return builder.append("]").toString();
}

@Override
public void writeThin(StreamOutput out) throws IOException {
super.writeThin(out);
if (out.getVersion().onOrAfter(V_3_0_0)) {
if (deletionStrategy == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
deletionStrategy.writeTo(out);
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(V_3_0_0)) {
if (deletionStrategy == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
deletionStrategy.writeTo(out);
}
}
}

@Override
public InternalEngine.DeletionStrategy writeStrategy() {
return deletionStrategy;
}

public static DeleteResponse fromXContent(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

Expand Down Expand Up @@ -122,7 +188,7 @@ public static class Builder extends DocWriteResponse.Builder {

@Override
public DeleteResponse build() {
DeleteResponse deleteResponse = new DeleteResponse(shardId, id, seqNo, primaryTerm, version, result);
DeleteResponse deleteResponse = new DeleteResponse(shardId, id, seqNo, primaryTerm, version, result, null);
deleteResponse.setForcedRefresh(forcedRefresh);
if (shardInfo != null) {
deleteResponse.setShardInfo(shardInfo);
Expand Down
Loading
Loading