Skip to content

Commit

Permalink
Update search only replica recovery flow
Browse files Browse the repository at this point in the history
This PR includes multiple changes to search replica recovery.
1. Change search only replica copies to recover as empty store instead of PEER. This will run a store recovery that syncs segments from remote store directly and eliminate any primary communication.
2. Remove search replicas from the in-sync allocation ID set and update routing table to exclude them from allAllocationIds.  This ensures primaries aren't tracking or validating the routing table for any search replica's presence.
3. Change search replica validation to require remote store.  There are versions of the above changes that are still possible with primary based node-node replication, but I don't think they are worth making  at this time.

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Dec 2, 2024
1 parent 3da97f2 commit a932d59
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
Expand All @@ -23,7 +24,7 @@
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {
public class SearchReplicaFilteringAllocationIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Settings featureFlagSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,32 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

private Boolean useRemoteStore;

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (useRemoteStore) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return super.nodeSettings(nodeOrdinal);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}

@After
public void teardown() {
if (useRemoteStore) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();

}

@Override
Expand Down Expand Up @@ -82,4 +72,39 @@ public void testReplication() throws Exception {
waitForSearchableDocs(docCount, primary, replica);
}

public void testRecoveryAfterDocsIndexed() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);

final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0))
.get();

ensureGreen(INDEX_NAME);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

internalCluster().restartNode(replica);
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.remotestore.RemoteSnapshotIT;
import org.opensearch.snapshots.SnapshotRestoreException;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -26,7 +26,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaRestoreIT extends AbstractSnapshotIntegTestCase {
public class SearchReplicaRestoreIT extends RemoteSnapshotIT {

private static final String INDEX_NAME = "test-idx-1";
private static final String RESTORED_INDEX_NAME = INDEX_NAME + "-restored";
Expand All @@ -40,49 +40,6 @@ protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testSearchReplicaRestore_WhenSnapshotOnDocRep_RestoreOnDocRepWithSearchReplica() throws Exception {
bootstrapIndexWithOutSearchReplicas(ReplicationType.DOCUMENT);
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

SnapshotRestoreException exception = expectThrows(
SnapshotRestoreException.class,
() -> restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build()
)
);
assertTrue(exception.getMessage().contains(getSnapshotExceptionMessage(ReplicationType.DOCUMENT, ReplicationType.DOCUMENT)));
}

public void testSearchReplicaRestore_WhenSnapshotOnDocRep_RestoreOnSegRepWithSearchReplica() throws Exception {
bootstrapIndexWithOutSearchReplicas(ReplicationType.DOCUMENT);
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build()
);
ensureYellowAndNoInitializingShards(RESTORED_INDEX_NAME);
internalCluster().startDataOnlyNode();
ensureGreen(RESTORED_INDEX_NAME);
assertEquals(1, getNumberOfSearchReplicas(RESTORED_INDEX_NAME));

SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
}

public void testSearchReplicaRestore_WhenSnapshotOnSegRep_RestoreOnDocRepWithSearchReplica() throws Exception {
bootstrapIndexWithOutSearchReplicas(ReplicationType.SEGMENT);
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);
Expand Down Expand Up @@ -140,27 +97,6 @@ public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_Resto
assertTrue(exception.getMessage().contains(getSnapshotExceptionMessage(ReplicationType.SEGMENT, ReplicationType.DOCUMENT)));
}

public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_RestoreOnDocRepWithNoSearchReplica() throws Exception {
bootstrapIndexWithSearchReplicas();
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)
.build()
);
ensureGreen(RESTORED_INDEX_NAME);
assertEquals(0, getNumberOfSearchReplicas(RESTORED_INDEX_NAME));

SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
}

private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType) throws InterruptedException {
startCluster(2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -31,7 +32,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchOnlyReplicaIT extends OpenSearchIntegTestCase {
public class SearchOnlyReplicaIT extends RemoteStoreBaseIntegTestCase {

private static final String TEST_INDEX = "test_index";

Expand All @@ -55,35 +56,6 @@ public Settings indexSettings() {
.build();
}

public void testCreateDocRepFails() {
Settings settings = Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build();

IllegalArgumentException illegalArgumentException = expectThrows(
IllegalArgumentException.class,
() -> createIndex(TEST_INDEX, settings)
);
assertEquals(expectedFailureMessage, illegalArgumentException.getMessage());
}

public void testUpdateDocRepFails() {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.build();
// create succeeds
createIndex(TEST_INDEX, settings);

// update fails
IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> {
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
});
assertEquals(expectedFailureMessage, illegalArgumentException.getMessage());
}

public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOException {
int numSearchReplicas = 1;
int numWriterReplicas = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,14 +1096,9 @@ static Settings aggregateIndexSettings(
private static void updateSearchOnlyReplicas(Settings requestSettings, Settings.Builder builder) {
if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(builder) && builder.get(SETTING_NUMBER_OF_SEARCH_REPLICAS) != null) {
if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings) > 0
&& ReplicationType.parseString(builder.get(INDEX_REPLICATION_TYPE_SETTING.getKey())).equals(ReplicationType.DOCUMENT)) {
&& Boolean.parseBoolean(builder.get(SETTING_REMOTE_STORE_ENABLED)) == false) {
throw new IllegalArgumentException(
"To set "
+ SETTING_NUMBER_OF_SEARCH_REPLICAS
+ ", "
+ INDEX_REPLICATION_TYPE_SETTING.getKey()
+ " must be set to "
+ ReplicationType.SEGMENT
"To set " + SETTING_NUMBER_OF_SEARCH_REPLICAS + ", " + SETTING_REMOTE_STORE_ENABLED + " must be set to true"
);
}
builder.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand All @@ -77,8 +76,8 @@
import java.util.Set;

import static org.opensearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateOverlap;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogDurabilitySettings;
Expand Down Expand Up @@ -538,14 +537,12 @@ public ClusterState execute(ClusterState currentState) {
private void validateSearchReplicaCountSettings(Settings requestSettings, Index[] indices, ClusterState currentState) {
final int updatedNumberOfSearchReplicas = IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings);
if (updatedNumberOfSearchReplicas > 0) {
if (Arrays.stream(indices).allMatch(index -> currentState.metadata().isSegmentReplicationEnabled(index.getName())) == false) {
if (Arrays.stream(indices)
.allMatch(
index -> currentState.metadata().index(index.getName()).getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)
) == false) {
throw new IllegalArgumentException(
"To set "
+ SETTING_NUMBER_OF_SEARCH_REPLICAS
+ ", "
+ INDEX_REPLICATION_TYPE_SETTING.getKey()
+ " must be set to "
+ ReplicationType.SEGMENT
"To set " + SETTING_NUMBER_OF_SEARCH_REPLICAS + ", " + SETTING_REMOTE_STORE_ENABLED + " must be set to true"
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,13 +575,7 @@ private Builder initializeAsRestore(
}
for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) {
indexShardRoutingBuilder.addShard(
ShardRouting.newUnassigned(
shardId,
false,
true,
PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled
unassignedInfo
)
ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo)
);
}
shards.put(shardNumber, indexShardRoutingBuilder.build());
Expand Down Expand Up @@ -624,13 +618,7 @@ private Builder initializeEmpty(IndexMetadata indexMetadata, UnassignedInfo unas
}
for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) {
indexShardRoutingBuilder.addShard(
ShardRouting.newUnassigned(
shardId,
false,
true,
PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled
unassignedInfo
)
ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo)
);
}
shards.put(shardNumber, indexShardRoutingBuilder.build());
Expand Down Expand Up @@ -665,7 +653,7 @@ public Builder addSearchReplica() {
shardId,
false,
true,
PeerRecoverySource.INSTANCE, // TODO: Change to remote store if enabled
EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.REPLICA_ADDED, null)
);
shards.put(shardNumber, new IndexShardRoutingTable.Builder(shards.get(shard.id())).addShard(shard).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@ public class IndexShardRoutingTable extends AbstractDiffable<IndexShardRoutingTa
if (shard.initializing()) {
allInitializingShards.add(shard);
}
if (shard.isSearchOnly()) {
// mark search only shards as initializing or assigned, but do not add to
// the allAllocationId set. Cluster Manager will filter out search replica aIds in
// the in-sync set that is sent to primaries, but they are still included in the routing table.
// This ensures the primaries do not validate these ids exist in tracking nor are included
// in the unavailableInSyncShards set.
if (shard.relocating()) {
allInitializingShards.add(shard.getTargetRelocatingShard());
assignedShards.add(shard.getTargetRelocatingShard());
}
if (shard.assignedToNode()) {
assignedShards.add(shard);
}
continue;
}
if (shard.relocating()) {
// create the target initializing shard routing on the node the shard is relocating to
allInitializingShards.add(shard.getTargetRelocatingShard());
Expand Down
Loading

0 comments on commit a932d59

Please sign in to comment.