Skip to content

Commit

Permalink
Upstream Fetch
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <pgodithi@amazon.com>
  • Loading branch information
prudhvigodithi committed Feb 21, 2025
1 parent 6e2bb1d commit 62d23ab
Show file tree
Hide file tree
Showing 10 changed files with 1,068 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@
* compatible open source license.
*/

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale.searchonly;

import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
Expand Down Expand Up @@ -115,17 +107,18 @@ public void testScaleDownToSearchOnly() throws Exception {
try {
client().prepareIndex(TEST_INDEX).setId("new-doc").setSource("field1", "new-value").get();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
// Expected exception
}
} catch (ClusterBlockException ignored) {}

// Verify routing table structure
assertEquals(0, getClusterState().routingTable().index(TEST_INDEX).shard(0).writerReplicas().size());
assertEquals(
1,
getClusterState().routingTable().index(TEST_INDEX).shard(0).searchOnlyReplicas().stream().filter(ShardRouting::active).count()
);
assertNull(shardTable.primaryShard());
assertBusy(() -> {
IndexShardRoutingTable currentShardTable = getClusterState().routingTable().index(TEST_INDEX).shard(0);
assertNull("Primary shard should be null after scale-down", currentShardTable.primaryShard());
});

}

Expand All @@ -144,11 +137,12 @@ public void testScaleUpFromSearchOnly() throws Exception {
ensureGreen(TEST_INDEX);

for (int i = 0; i < 5; i++) {
client().prepareIndex(TEST_INDEX)
IndexResponse indexResponse = client().prepareIndex(TEST_INDEX)
.setId(Integer.toString(i))
.setSource("field1", "value" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertEquals(RestStatus.CREATED, indexResponse.status());
}

// Verify initial state has expected replica counts
Expand Down Expand Up @@ -203,7 +197,7 @@ public void testScaleUpFromSearchOnly() throws Exception {

// Verify we can search existing data
SearchResponse searchResponse = client().prepareSearch(TEST_INDEX).get();
assertHitCount(searchResponse, 3);
assertHitCount(searchResponse, 5);

// Verify we can write to the index again
IndexResponse indexResponse = client().prepareIndex(TEST_INDEX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.settings.Settings;
Expand All @@ -21,7 +25,9 @@

import java.util.List;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -102,8 +108,8 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType

Settings settings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType)
.build();
Expand All @@ -114,13 +120,29 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType
ensureGreen(INDEX_NAME);
}

public void testRemoteStoreRestoreFailsForSearchOnlyIndex() throws Exception {
bootstrapIndexWithSearchReplicas();
assertAcked(client().admin().indices().prepareSearchOnly(INDEX_NAME).setScaleDown(true).get());

GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings(INDEX_NAME).get();
assertEquals("true", settingsResponse.getSetting(INDEX_NAME, IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey()));

IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
PlainActionFuture<RestoreRemoteStoreResponse> future = PlainActionFuture.newFuture();
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), future);
future.actionGet();
});

assertTrue(exception.getMessage().contains("Cannot restore index [" + INDEX_NAME + "] because search-only mode is enabled"));
}

private void bootstrapIndexWithSearchReplicas() throws InterruptedException {
startCluster(3);

Settings settings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* index state compatibility, and configuration prerequisites such as remote store
* and segment replication settings.
*/
class ScaleOperationValidator {
class SearchOnlyOperationValidator {

/**
* Validates that the given index meets the prerequisites for the scale operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class TransportSearchOnlyAction extends TransportClusterManagerNodeAction
private final IndicesService indicesService;
private final TransportService transportService;

private final ScaleOperationValidator validator;
private final SearchOnlyOperationValidator validator;
private final SearchOnlyClusterStateBuilder searchOnlyClusterStateBuilder;
private final SearchOnlyShardSyncManager searchOnlyShardSyncManager;

Expand Down Expand Up @@ -131,7 +131,7 @@ public TransportSearchOnlyAction(
this.allocationService = allocationService;
this.indicesService = indicesService;
this.transportService = transportService;
this.validator = new ScaleOperationValidator();
this.validator = new SearchOnlyOperationValidator();
this.searchOnlyClusterStateBuilder = new SearchOnlyClusterStateBuilder();
this.searchOnlyShardSyncManager = new SearchOnlyShardSyncManager(clusterService, transportService, NAME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,40 +167,31 @@ public RemoteRestoreResult restore(
IndicesOptions.fromOptions(true, true, true, true)
);

boolean allSearchOnly = true;
for (String indexName : filteredIndices) {
IndexMetadata indexMetadata = currentState.metadata().index(indexName);
if (indexMetadata == null) {
logger.warn("Skipping restore: index [{}] does not exist.", indexName);
logger.warn("Index restore is not supported for non-existent index. Skipping: {}", indexName);
continue;

Check warning on line 174 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L174

Added line #L174 was not covered by tests
}

boolean isSearchOnly = indexMetadata.getSettings()
.getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false);

Check warning on line 177 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L176-L177

Added lines #L176 - L177 were not covered by tests

if (isSearchOnly) {
logger.warn("Skipping _remotestore/_restore for index [{}] as search-only mode is enabled.", indexName);
} else if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Cannot restore index [%s] because search-only mode is enabled", indexName)

Check warning on line 180 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L179-L180

Added lines #L179 - L180 were not covered by tests
);
}
if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"cannot restore index [%s] because an open index with same name/uuid already exists in the cluster.",
indexName
) + " Close the existing index."
);
} else {
allSearchOnly = false;
indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata));
}
}

if (allSearchOnly) {
throw new IllegalArgumentException(
"Skipping _remotestore/_restore for all selected indices as search-only mode is enabled."
);
indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata));

Check warning on line 192 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L192

Added line #L192 was not covered by tests
}
}

return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,9 +1185,9 @@ public void removeIndex(final Index index, final IndexRemovalReason reason, fina
}

listener.beforeIndexRemoved(indexService, reason);
logger.debug("{} closing index service (reason [{}][{}])", index, reason, extraInfo);
logger.info("{} closing index service (reason [{}][{}])", index, reason, extraInfo);
indexService.close(extraInfo, reason == IndexRemovalReason.DELETED);
logger.debug("{} closed... (reason [{}][{}])", index, reason, extraInfo);
logger.info("{} closed... (reason [{}][{}])", index, reason, extraInfo);
final IndexSettings indexSettings = indexService.getIndexSettings();
listener.afterIndexRemoved(indexService.index(), indexSettings, reason);
if (reason == IndexRemovalReason.DELETED) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale.searchonly;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchTestCase;

import java.util.HashMap;
import java.util.Map;

import static org.opensearch.action.admin.indices.scale.searchonly.TransportSearchOnlyAction.INDEX_SEARCHONLY_BLOCK;
import static org.opensearch.action.admin.indices.scale.searchonly.TransportSearchOnlyAction.INDEX_SEARCHONLY_BLOCK_ID;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;

public class SearchOnlyClusterStateBuilderTests extends OpenSearchTestCase {

private SearchOnlyClusterStateBuilder builder;
private ClusterState initialState;
private String testIndex;
private IndexMetadata indexMetadata;

@Override
public void setUp() throws Exception {
super.setUp();
builder = new SearchOnlyClusterStateBuilder();
testIndex = "test_index";

// Create basic index metadata with segment replication enabled
Settings indexSettings = Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_INDEX_UUID, randomAlphaOfLength(8))
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) // Add search replicas
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) // Enable segment replication
.put(SETTING_REMOTE_STORE_ENABLED, true) // Enable remote store
.put(SETTING_CREATION_DATE, System.currentTimeMillis())
.build();

indexMetadata = IndexMetadata.builder(testIndex).settings(indexSettings).build();

// Create initial cluster state with routing table
Metadata metadata = Metadata.builder().put(indexMetadata, true).build();

initialState = ClusterState.builder(ClusterState.EMPTY_STATE)
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(indexMetadata).build())
.build();
}

public void testBuildScaleDownState() {
Map<Index, org.opensearch.cluster.block.ClusterBlock> blockedIndices = new HashMap<>();

// Execute scale down state build
ClusterState newState = builder.buildScaleDownState(initialState, testIndex, blockedIndices);

// Verify block was added
assertTrue("Scale down block should be present", newState.blocks().hasIndexBlockWithId(testIndex, INDEX_SEARCHONLY_BLOCK_ID));

// Verify blocked indices map was updated
assertFalse("Blocked indices map should not be empty", blockedIndices.isEmpty());
assertEquals("Should have one blocked index", 1, blockedIndices.size());
assertTrue("Index should be in blocked indices map", blockedIndices.containsKey(indexMetadata.getIndex()));
}

public void testBuildFinalScaleDownState() {
Map<Index, org.opensearch.cluster.block.ClusterBlock> blockedIndices = new HashMap<>();
ClusterState stateWithBlock = builder.buildScaleDownState(initialState, testIndex, blockedIndices);

ClusterState finalState = builder.buildFinalScaleDownState(stateWithBlock, testIndex);

// Verify blocks
assertFalse(
"Temporary block should be removed",
finalState.blocks().hasIndexBlock(testIndex, blockedIndices.get(indexMetadata.getIndex()))
);
assertTrue("Search-only block should be present", finalState.blocks().hasIndexBlock(testIndex, INDEX_SEARCHONLY_BLOCK));

// Verify metadata was updated
IndexMetadata updatedMetadata = finalState.metadata().index(testIndex);
assertTrue(
"Index should be marked as search-only",
updatedMetadata.getSettings().getAsBoolean(INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false)
);
}

public void testBuildScaleUpRoutingTable() {
// Prepare a proper search-only state
Settings scaleUpSettings = Settings.builder()
.put(indexMetadata.getSettings())
.put(INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), true)
.build();

IndexMetadata searchOnlyMetadata = IndexMetadata.builder(indexMetadata).settings(scaleUpSettings).build();

// Create search-only shard routing
ShardRouting searchOnlyShard = ShardRouting.newUnassigned(
new ShardId(searchOnlyMetadata.getIndex(), 0),
false, // not primary
true, // search only
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test")
);

// Build routing table with search-only shard
IndexRoutingTable.Builder routingTableBuilder = new IndexRoutingTable.Builder(searchOnlyMetadata.getIndex()).addShard(
searchOnlyShard
);

ClusterState searchOnlyState = ClusterState.builder(initialState)
.metadata(Metadata.builder(initialState.metadata()).put(searchOnlyMetadata, true))
.routingTable(RoutingTable.builder().add(routingTableBuilder.build()).build())
.build();

// Execute scale up
RoutingTable newRoutingTable = builder.buildScaleUpRoutingTable(searchOnlyState, testIndex);

// Verify routing table
IndexRoutingTable indexRoutingTable = newRoutingTable.index(testIndex);
assertNotNull("Index routing table should exist", indexRoutingTable);

// Verify primary shard was added
boolean hasPrimary = indexRoutingTable.shardsWithState(UNASSIGNED).stream().anyMatch(ShardRouting::primary);
assertTrue("Should have an unassigned primary shard", hasPrimary);

// Verify regular replicas were added (excluding search replicas)
long replicaCount = indexRoutingTable.shardsWithState(UNASSIGNED)
.stream()
.filter(shard -> !shard.primary() && !shard.isSearchOnly())
.count();
assertEquals("Should have correct number of replica shards", indexMetadata.getNumberOfReplicas(), replicaCount);

// Verify search replicas were preserved
long searchReplicaCount = indexRoutingTable.shardsWithState(UNASSIGNED).stream().filter(ShardRouting::isSearchOnly).count();
assertEquals("Should preserve search replicas", indexMetadata.getNumberOfSearchOnlyReplicas(), searchReplicaCount);
}

public void testBuildFinalScaleDownStateWithInvalidIndex() {
expectThrows(IllegalStateException.class, () -> builder.buildFinalScaleDownState(initialState, "nonexistent_index"));
}
}
Loading

0 comments on commit 62d23ab

Please sign in to comment.