Skip to content

Commit

Permalink
use indices stats api to check replica behind.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Sep 12, 2023
1 parent cfd1fa2 commit 75fdc0f
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@
import org.opensearch.action.RealtimeRequest;
import org.opensearch.action.ValidateActions;
import org.opensearch.action.support.single.shard.SingleShardRequest;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -159,8 +155,7 @@ public GetRequest routing(String routing) {
* will be used across different requests.
*/
public GetRequest preference(String preference) {
this.preference = FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL) ? Preference.PRIMARY.type() : preference;
;
this.preference = preference;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@
import org.opensearch.action.RealtimeRequest;
import org.opensearch.action.ValidateActions;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.Nullable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.ParsingException;
Expand Down Expand Up @@ -323,8 +321,7 @@ public ActionRequestValidationException validate() {
* will be used across different requests.
*/
public MultiGetRequest preference(String preference) {
this.preference = FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL) ? Preference.PRIMARY.type() : preference;
;
this.preference = preference;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.single.shard.SingleShardRequest;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

Expand Down Expand Up @@ -79,9 +77,7 @@ public class MultiGetShardRequest extends SingleShardRequest<MultiGetShardReques
this.shardId = shardId;
locations = new ArrayList<>();
items = new ArrayList<>();
preference = FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL)
? Preference.PRIMARY.type()
: multiGetRequest.preference;
preference = multiGetRequest.preference;
realtime = multiGetRequest.realtime;
refresh = multiGetRequest.refresh;
}
Expand All @@ -102,8 +98,7 @@ public int shardId() {
* will be used across different requests.
*/
public MultiGetShardRequest preference(String preference) {
this.preference = FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL) ? Preference.PRIMARY.type() : preference;
;
this.preference = preference;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,28 @@
package org.opensearch.test.client;

import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.apache.lucene.tests.util.TestUtil;
import org.junit.Assert;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.client.FilterClient;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.junit.Assert;

import java.util.Map;
import java.util.List;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.junit.Assert.assertTrue;

/** A {@link Client} that randomizes request parameters. */
public class SegmentReplicationClient extends FilterClient {
Expand Down Expand Up @@ -78,30 +74,9 @@ public SearchRequestBuilder prepareSearch(String... indices) {
// wait until replica shard is caught up before performing search request.
assertBusy(() -> {
for (String index : indexes) {
final SegmentReplicationStatsResponse segmentReplicationStatsResponse = this.admin()
.indices()
.prepareSegmentReplicationStats(index)
.execute()
.actionGet();
final Map<String, List<SegmentReplicationPerGroupStats>> replicationStats = segmentReplicationStatsResponse
.getReplicationStats();
for (Map.Entry<String, List<SegmentReplicationPerGroupStats>> perIndex : replicationStats.entrySet()) {
final List<SegmentReplicationPerGroupStats> value = perIndex.getValue();
for (SegmentReplicationPerGroupStats group : value) {
for (SegmentReplicationShardStats replicaStat : group.getReplicaStats()) {
logger.info(
"replica shard allocation id is:"
+ replicaStat.getAllocationId()
+ " and checkpoints beyond is: "
+ replicaStat.getCheckpointsBehindCount()
);
assertEquals(0, replicaStat.getCheckpointsBehindCount());
if (replicaStat.getCurrentReplicationState() != null) {
assertEquals(SegmentReplicationState.Stage.DONE, replicaStat.getCurrentReplicationState().getStage());
}
}
}
}
final IndicesStatsResponse indicesStatsResponse = this.admin().indices().prepareStats(index).execute().actionGet();

assertTrue(indicesStatsResponse.getIndex(index).getTotal().getSegments().getReplicationStats().maxBytesBehind == 0);
}
}, 30, TimeUnit.SECONDS);
} catch (Exception e) {
Expand Down Expand Up @@ -136,21 +111,9 @@ public ActionFuture<SearchResponse> search(SearchRequest request) {
// wait until replica shard is caught up before performing search request.
assertBusy(() -> {
for (String index : indexes) {
final SegmentReplicationStatsResponse segmentReplicationStatsResponse = this.admin()
.indices()
.prepareSegmentReplicationStats(index)
.execute()
.actionGet();
final Map<String, List<SegmentReplicationPerGroupStats>> replicationStats = segmentReplicationStatsResponse
.getReplicationStats();
for (Map.Entry<String, List<SegmentReplicationPerGroupStats>> perIndex : replicationStats.entrySet()) {
final List<SegmentReplicationPerGroupStats> value = perIndex.getValue();
for (SegmentReplicationPerGroupStats group : value) {
for (SegmentReplicationShardStats replicaStat : group.getReplicaStats()) {
assertEquals(0, replicaStat.getCheckpointsBehindCount());
}
}
}
final IndicesStatsResponse indicesStatsResponse = this.admin().indices().prepareStats(index).execute().actionGet();

assertTrue(indicesStatsResponse.getIndex(index).getTotal().getSegments().getReplicationStats().maxBytesBehind == 0);
}
});
} catch (Exception e) {
Expand Down

0 comments on commit 75fdc0f

Please sign in to comment.