Skip to content

Commit

Permalink
take 3
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Nov 27, 2024
1 parent 3da97f2 commit 445c4d1
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 59 deletions.
1 change: 1 addition & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ testClusters {
plugin('plugins:'.concat(p))
}
}
systemProperty("opensearch.experimental.feature.read.write.split.enabled", "true")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,32 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

private Boolean useRemoteStore;

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (useRemoteStore) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return super.nodeSettings(nodeOrdinal);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}

@After
public void teardown() {
if (useRemoteStore) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();

}

@Override
Expand Down Expand Up @@ -82,4 +72,39 @@ public void testReplication() throws Exception {
waitForSearchableDocs(docCount, primary, replica);
}

public void testRecoveryAfterDocsIndexed() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);

final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0))
.get();

ensureGreen(INDEX_NAME);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

internalCluster().restartNode(replica);
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1096,14 +1096,9 @@ static Settings aggregateIndexSettings(
private static void updateSearchOnlyReplicas(Settings requestSettings, Settings.Builder builder) {
if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(builder) && builder.get(SETTING_NUMBER_OF_SEARCH_REPLICAS) != null) {
if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings) > 0
&& ReplicationType.parseString(builder.get(INDEX_REPLICATION_TYPE_SETTING.getKey())).equals(ReplicationType.DOCUMENT)) {
&& Boolean.parseBoolean(builder.get(SETTING_REMOTE_STORE_ENABLED)) == false) {
throw new IllegalArgumentException(
"To set "
+ SETTING_NUMBER_OF_SEARCH_REPLICAS
+ ", "
+ INDEX_REPLICATION_TYPE_SETTING.getKey()
+ " must be set to "
+ ReplicationType.SEGMENT
"To set " + SETTING_NUMBER_OF_SEARCH_REPLICAS + ", " + SETTING_REMOTE_STORE_ENABLED + " must be set to true"
);
}
builder.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand All @@ -77,8 +76,8 @@
import java.util.Set;

import static org.opensearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateOverlap;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogDurabilitySettings;
Expand Down Expand Up @@ -538,14 +537,12 @@ public ClusterState execute(ClusterState currentState) {
private void validateSearchReplicaCountSettings(Settings requestSettings, Index[] indices, ClusterState currentState) {
final int updatedNumberOfSearchReplicas = IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings);
if (updatedNumberOfSearchReplicas > 0) {
if (Arrays.stream(indices).allMatch(index -> currentState.metadata().isSegmentReplicationEnabled(index.getName())) == false) {
if (Arrays.stream(indices)
.allMatch(
index -> currentState.metadata().index(index.getName()).getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)
) == false) {
throw new IllegalArgumentException(
"To set "
+ SETTING_NUMBER_OF_SEARCH_REPLICAS
+ ", "
+ INDEX_REPLICATION_TYPE_SETTING.getKey()
+ " must be set to "
+ ReplicationType.SEGMENT
"To set " + SETTING_NUMBER_OF_SEARCH_REPLICAS + ", " + SETTING_REMOTE_STORE_ENABLED + " must be set to true"
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,13 +575,7 @@ private Builder initializeAsRestore(
}
for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) {
indexShardRoutingBuilder.addShard(
ShardRouting.newUnassigned(
shardId,
false,
true,
PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled
unassignedInfo
)
ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo)
);
}
shards.put(shardNumber, indexShardRoutingBuilder.build());
Expand Down Expand Up @@ -624,13 +618,7 @@ private Builder initializeEmpty(IndexMetadata indexMetadata, UnassignedInfo unas
}
for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) {
indexShardRoutingBuilder.addShard(
ShardRouting.newUnassigned(
shardId,
false,
true,
PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled
unassignedInfo
)
ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo)
);
}
shards.put(shardNumber, indexShardRoutingBuilder.build());
Expand All @@ -645,7 +633,7 @@ public Builder addReplica() {
ShardRouting shard = ShardRouting.newUnassigned(
shardId,
false,
PeerRecoverySource.INSTANCE,
EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.REPLICA_ADDED, null)
);
shards.put(shardNumber, new IndexShardRoutingTable.Builder(shards.get(shard.id())).addShard(shard).build());
Expand All @@ -665,7 +653,7 @@ public Builder addSearchReplica() {
shardId,
false,
true,
PeerRecoverySource.INSTANCE, // TODO: Change to remote store if enabled
EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.REPLICA_ADDED, null)
);
shards.put(shardNumber, new IndexShardRoutingTable.Builder(shards.get(shard.id())).addShard(shard).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,25 @@ public class IndexShardRoutingTable extends AbstractDiffable<IndexShardRoutingTa
if (shard.initializing()) {
allInitializingShards.add(shard);
}
if (shard.isSearchOnly()) {
// mark search only shards as initializing or assigned, but do not add to
// the allAllocationId set. Cluster Manager will filter out search replica aIds in
// the in-sync set to primaries, but they are still included in the routing table.
// This ensures the primaries do not validate these ids exist in tracking nor are incluced
// in the unavailableInSyncShards set.
if (shard.relocating()) {
allInitializingShards.add(shard.getTargetRelocatingShard());
assignedShards.add(shard.getTargetRelocatingShard());
}
if (shard.assignedToNode()) {
assignedShards.add(shard);
}
continue;
}
if (shard.relocating()) {
// create the target initializing shard routing on the node the shard is relocating to
allInitializingShards.add(shard.getTargetRelocatingShard());
// search replicas should not be tracked via in sync allocation ids.
allAllocationIds.add(shard.getTargetRelocatingShard().allocationId().getId());

assert shard.assignedToNode() : "relocating from unassigned " + shard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected ShardRouting(
assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta";
assert (state == ShardRoutingState.UNASSIGNED || state == ShardRoutingState.INITIALIZING) == (recoverySource != null)
: "recovery source only available on unassigned or initializing shard but was " + state;
assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary
assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary || searchOnly
: "replica shards always recover from primary";
assert (currentNodeId == null) == (state == ShardRoutingState.UNASSIGNED) : "unassigned shard must not be assigned to a node "
+ this;
Expand Down Expand Up @@ -442,6 +442,8 @@ public ShardRouting moveToUnassigned(UnassignedInfo unassignedInfo) {
if (active()) {
if (primary()) {
recoverySource = ExistingStoreRecoverySource.INSTANCE;
} else if (isSearchOnly()) {
recoverySource = RecoverySource.ExistingStoreRecoverySource.INSTANCE;
} else {
recoverySource = PeerRecoverySource.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha
+ startedShard.allocationId().getId()
+ "] have to have the same";
Updates updates = changes(startedShard.shardId());
updates.addedAllocationIds.add(startedShard.allocationId().getId());
// if the started shard is an untracked replica, don't bother sending it as part of the
// in sync id set.
if (startedShard.isSearchOnly() == false) {
updates.addedAllocationIds.add(startedShard.allocationId().getId());
}
if (startedShard.primary()
// started shard has to have null recoverySource; have to pick up recoverySource from its initializing state
&& (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return allocation.decision(YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
}
} else {
if (shardRouting.isSearchOnly()) {
// search replicas recover from store and trigger a round of segRep before being marked active.
// the replication source can be either be another node for node-node replication or remote store.
// rely on replication throttling mechanisms instead.
return allocation.decision(YES, NAME, "Do not throttle search replica recovery");
}
// Peer recovery
assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,7 @@ public synchronized void updateFromClusterManager(
// remove entries which don't exist on cluster-manager
Set<String> initializingAllocationIds = routingTable.getAllInitializingShards()
.stream()
.filter(r -> r.isSearchOnly() == false)
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toSet());
Expand Down
17 changes: 16 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2543,6 +2543,20 @@ assert routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT
getEngine().translogManager().skipTranslogRecovery();
}

/**
* Opens the engine for pull based replica copies that are
* not primary eligible. This will skip any checkpoint tracking and ensure
* that shards are sync'd with remote store before opening.
*/
void openEngineForUntrackedReplica() throws IOException {
assert routingEntry().isSearchOnly() : "not a search only replica [" + routingEntry() + "]";
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
innerOpenEngineAndTranslog(replicationTracker, true);
}

private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {
innerOpenEngineAndTranslog(globalCheckpointSupplier, true);
}
Expand Down Expand Up @@ -2889,7 +2903,8 @@ public void recoverFromLocalShards(
public void recoverFromStore(ActionListener<Boolean> listener) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert shardRouting.primary() || shardRouting.isSearchOnly()
: "recover from store only makes sense if the shard is a primary shard or an untracked search only replica";
assert shardRouting.initializing() : "can only start recovery on initializing shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromStore(this, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ private boolean canRecover(IndexShard indexShard) {
// got closed on us, just ignore this recovery
return false;
}
if (indexShard.routingEntry().primary() == false) {
if (indexShard.routingEntry().primary() == false && indexShard.routingEntry().isSearchOnly() == false) {
throw new IndexShardRecoveryException(shardId, "Trying to recover when the shard is in backup state", null);
}
return true;
Expand Down Expand Up @@ -747,7 +747,11 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
writeEmptyRetentionLeasesFile(indexShard);
indexShard.recoveryState().getIndex().setFileDetailsComplete();
}
indexShard.openEngineAndRecoverFromTranslog();
if (indexShard.routingEntry().isSearchOnly() == false) {
indexShard.openEngineAndRecoverFromTranslog();
} else {
indexShard.openEngineForUntrackedReplica();
}
if (indexShard.shouldSeedRemoteStore()) {
indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> {
logger.info("Attempting to seed Remote Store via local recovery for {}", indexShard.shardId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,7 @@ private IndexService newIndexService(IndexModule module) throws IOException {
translogFactorySupplier,
() -> IndexSettings.DEFAULT_REFRESH_INTERVAL,
DefaultRecoverySettings.INSTANCE,
DefaultRemoteStoreSettings.INSTANCE,
s -> {}
DefaultRemoteStoreSettings.INSTANCE
);
}

Expand Down

0 comments on commit 445c4d1

Please sign in to comment.