Skip to content

Commit

Permalink
Upstream Fetch
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <pgodithi@amazon.com>
  • Loading branch information
prudhvigodithi committed Feb 22, 2025
1 parent 6e2bb1d commit 602854c
Show file tree
Hide file tree
Showing 17 changed files with 2,163 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@
* compatible open source license.
*/

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale.searchonly;

import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
Expand All @@ -34,6 +26,8 @@
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
Expand Down Expand Up @@ -101,7 +95,7 @@ public void testScaleDownToSearchOnly() throws Exception {
assertEquals(1, shardTable.searchOnlyReplicas().stream().filter(ShardRouting::active).count());

// Scale down to search-only mode
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setScaleDown(true).get());
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setSearchOnly(true).get());

// Verify index is in search-only mode
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings(TEST_INDEX).get();
Expand All @@ -115,17 +109,18 @@ public void testScaleDownToSearchOnly() throws Exception {
try {
client().prepareIndex(TEST_INDEX).setId("new-doc").setSource("field1", "new-value").get();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
// Expected exception
}
} catch (ClusterBlockException ignored) {}

// Verify routing table structure
assertEquals(0, getClusterState().routingTable().index(TEST_INDEX).shard(0).writerReplicas().size());
assertEquals(
1,
getClusterState().routingTable().index(TEST_INDEX).shard(0).searchOnlyReplicas().stream().filter(ShardRouting::active).count()
);
assertNull(shardTable.primaryShard());
assertBusy(() -> {
IndexShardRoutingTable currentShardTable = getClusterState().routingTable().index(TEST_INDEX).shard(0);
assertNull("Primary shard should be null after scale-down", currentShardTable.primaryShard());
});

}

Expand All @@ -143,75 +138,80 @@ public void testScaleUpFromSearchOnly() throws Exception {
createIndex(TEST_INDEX, specificSettings);
ensureGreen(TEST_INDEX);

// Index documents and ensure they're visible
for (int i = 0; i < 5; i++) {
client().prepareIndex(TEST_INDEX)
IndexResponse indexResponse = client().prepareIndex(TEST_INDEX)
.setId(Integer.toString(i))
.setSource("field1", "value" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertEquals(RestStatus.CREATED, indexResponse.status());
}

// Verify initial state has expected replica counts
IndexShardRoutingTable initialShardTable = getClusterState().routingTable().index(TEST_INDEX).shard(0);
assertEquals(1, initialShardTable.writerReplicas().size());
assertEquals(1, initialShardTable.searchOnlyReplicas().stream().filter(ShardRouting::active).count());
// Verify initial document count
assertBusy(() -> {
SearchResponse response = client().prepareSearch(TEST_INDEX).get();
assertHitCount(response, 5);
});

// Scale down to search-only mode
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setScaleDown(true).get());
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings(TEST_INDEX).get();
assertTrue(settingsResponse.getSetting(TEST_INDEX, IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey()).equals("true"));
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setSearchOnly(true).get());
ensureGreen(TEST_INDEX);

// Wait for search-only mode to stabilize and verify shard state
// Verify documents are still accessible in search-only mode
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
IndexShardRoutingTable shardTable = state.routingTable().index(TEST_INDEX).shard(0);
// In search-only mode, there should be no primary shard
assertNull("Primary should be null in search-only mode", shardTable.primaryShard());
// Only search replicas should remain
assertEquals(0, shardTable.writerReplicas().size());
// All shards should be search-only replicas and STARTED
for (ShardRouting shard : shardTable) {
assertTrue("All shards should be search-only replicas", shard.isSearchOnly());
assertTrue("All search replicas should be started", shard.active());
}
SearchResponse response = client().prepareSearch(TEST_INDEX).get();
assertHitCount(response, 5);
});

// Scale back up to normal mode
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setScaleDown(false).get());
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setSearchOnly(false).get());

// Wait for index to be fully operational
ensureGreen(TEST_INDEX);
// After scaling up, wait for routing table to stabilize with the expected number of replicas

// Wait for shards to be properly allocated
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
IndexRoutingTable routingTable = state.routingTable().index(TEST_INDEX);
int totalPrimaries = 0;
int totalWriterReplicas = 0;
int totalSearchReplicas = 0;
for (IndexShardRoutingTable shardTable : routingTable) {
if (shardTable.primaryShard() != null) {
if (shardTable.primaryShard() != null && shardTable.primaryShard().active()) {
totalPrimaries++;
}
totalWriterReplicas += shardTable.writerReplicas().size();
totalWriterReplicas += shardTable.writerReplicas().stream().filter(ShardRouting::active).count();
totalSearchReplicas += shardTable.searchOnlyReplicas().stream().filter(ShardRouting::active).count();
}
assertEquals("Expected 2 primary", 2, totalPrimaries);
assertEquals("Expected 2 writer replicas", 2, totalWriterReplicas);
assertEquals("Expected 2 search replica", 2, totalSearchReplicas);
});

// Verify index is no longer in search-only mode
settingsResponse = client().admin().indices().prepareGetSettings(TEST_INDEX).get();
assertTrue(settingsResponse.getSetting(TEST_INDEX, IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey()).equals("false"));
// Wait for documents to be fully recovered
assertBusy(() -> {
SearchResponse response = client().prepareSearch(TEST_INDEX).get();
assertHitCount(response, 5);
}, 30, TimeUnit.SECONDS);

// Verify we can search existing data
// Final verification of search and write capabilities
SearchResponse searchResponse = client().prepareSearch(TEST_INDEX).get();
assertHitCount(searchResponse, 3);
assertHitCount(searchResponse, 5);

// Verify we can write to the index again
// Verify we can write new documents
IndexResponse indexResponse = client().prepareIndex(TEST_INDEX)
.setId("new-doc")
.setSource("field1", "new-value")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertEquals(RestStatus.CREATED, indexResponse.status());

assertBusy(() -> {
SearchResponse finalResponse = client().prepareSearch(TEST_INDEX).get();
assertHitCount(finalResponse, 6);
});

}

public void testScaleDownValidationWithoutSearchReplicas() {
Expand All @@ -230,7 +230,7 @@ public void testScaleDownValidationWithoutSearchReplicas() {
// Attempt to scale down should fail due to missing segment replication
IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> client().admin().indices().prepareSearchOnly(TEST_INDEX).setScaleDown(true).get()
() -> client().admin().indices().prepareSearchOnly(TEST_INDEX).setSearchOnly(true).get()
);

// Verify exception message mentions missing segment replication requirement
Expand Down Expand Up @@ -283,7 +283,7 @@ public void testSearchOnlyRecoveryWithPersistentData() throws Exception {
});

// Enable search-only mode
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setScaleDown(true).get());
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setSearchOnly(true).get());

// Verify only search replicas are active
assertBusy(() -> {
Expand Down Expand Up @@ -353,7 +353,7 @@ public void testClusterRemoteStoreStateEnabled() throws Exception {
ensureGreen(TEST_INDEX);

// Enable search-only mode
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setScaleDown(true).get());
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setSearchOnly(true).get());

// Stop all nodes
internalCluster().stopAllNodes();
Expand Down Expand Up @@ -422,7 +422,7 @@ public void testRecoveryWithPersistentDataAndRemoteStore() throws Exception {
});

// Enable search-only mode
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setScaleDown(true).get());
assertAcked(client().admin().indices().prepareSearchOnly(TEST_INDEX).setSearchOnly(true).get());

// Verify only search replicas remain active
assertBusy(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.settings.Settings;
Expand All @@ -21,7 +25,9 @@

import java.util.List;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -102,8 +108,8 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType

Settings settings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType)
.build();
Expand All @@ -114,13 +120,29 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType
ensureGreen(INDEX_NAME);
}

public void testRemoteStoreRestoreFailsForSearchOnlyIndex() throws Exception {
bootstrapIndexWithSearchReplicas();
assertAcked(client().admin().indices().prepareSearchOnly(INDEX_NAME).setSearchOnly(true).get());

GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings(INDEX_NAME).get();
assertEquals("true", settingsResponse.getSetting(INDEX_NAME, IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey()));

IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
PlainActionFuture<RestoreRemoteStoreResponse> future = PlainActionFuture.newFuture();
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), future);
future.actionGet();
});

assertTrue(exception.getMessage().contains("Cannot restore index [" + INDEX_NAME + "] because search-only mode is enabled"));
}

private void bootstrapIndexWithSearchReplicas() throws InterruptedException {
startCluster(3);

Settings settings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* index state compatibility, and configuration prerequisites such as remote store
* and segment replication settings.
*/
class ScaleOperationValidator {
class SearchOnlyOperationValidator {

/**
* Validates that the given index meets the prerequisites for the scale operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ public SearchOnlyRequestBuilder(OpenSearchClient client, boolean scaleDown, Stri
* <li>Scale up an index from search-only mode back to full read-write operation</li>
* </ul>
*
* @param scaleDown true if scaling down to search-only mode, false if scaling up to normal operation
* @param searchOnly true if scaling down to search-only mode, false if scaling up to normal operation
* @return this builder (for method chaining)
*/
public SearchOnlyRequestBuilder setScaleDown(boolean scaleDown) {
request.scaleDown(scaleDown);
public SearchOnlyRequestBuilder setSearchOnly(boolean searchOnly) {
request.scaleDown(searchOnly);
return this;

Check warning on line 65 in server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyRequestBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyRequestBuilder.java#L64-L65

Added lines #L64 - L65 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class TransportSearchOnlyAction extends TransportClusterManagerNodeAction
private final IndicesService indicesService;
private final TransportService transportService;

private final ScaleOperationValidator validator;
private final SearchOnlyOperationValidator validator;
private final SearchOnlyClusterStateBuilder searchOnlyClusterStateBuilder;
private final SearchOnlyShardSyncManager searchOnlyShardSyncManager;

Expand Down Expand Up @@ -131,7 +131,7 @@ public TransportSearchOnlyAction(
this.allocationService = allocationService;
this.indicesService = indicesService;
this.transportService = transportService;
this.validator = new ScaleOperationValidator();
this.validator = new SearchOnlyOperationValidator();
this.searchOnlyClusterStateBuilder = new SearchOnlyClusterStateBuilder();
this.searchOnlyShardSyncManager = new SearchOnlyShardSyncManager(clusterService, transportService, NAME);

Expand Down Expand Up @@ -239,7 +239,7 @@ private void finalizeScaleDown(String index, ActionListener<AcknowledgedResponse
/**
* Handles an incoming shard sync request from another node.
*/
private void handleShardSyncRequest(NodeSearchOnlyRequest request, TransportChannel channel) throws Exception {
void handleShardSyncRequest(NodeSearchOnlyRequest request, TransportChannel channel) throws Exception {
logger.info("Handling shard sync request for index [{}]", request.getIndex());
ClusterState state = clusterService.state();

Expand Down Expand Up @@ -277,7 +277,7 @@ private List<ShardSearchOnlyResponse> syncShards(IndexService indexService, List
return shardResponses;
}

private ShardSearchOnlyResponse syncSingleShard(IndexShard shard) throws Exception {
ShardSearchOnlyResponse syncSingleShard(IndexShard shard) throws Exception {
logger.info("Performing final sync and flush for shard {}", shard.shardId());
shard.sync();
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
Expand Down Expand Up @@ -331,7 +331,7 @@ protected ClusterBlockException checkBlock(SearchOnlyRequest request, ClusterSta
* <li>Initiates shard synchronization after the block is applied</li>
* </ul>
*/
private class AddBlockClusterStateUpdateTask extends ClusterStateUpdateTask {
class AddBlockClusterStateUpdateTask extends ClusterStateUpdateTask {
private final String index;
private final Map<Index, ClusterBlock> blockedIndices;
private final ActionListener<AcknowledgedResponse> listener;
Expand Down Expand Up @@ -390,7 +390,7 @@ public void onFailure(String source, Exception e) {
* <li>Updates the routing table to remove non-search-only shards</li>
* </ul>
*/
private class FinalizeScaleDownTask extends ClusterStateUpdateTask {
class FinalizeScaleDownTask extends ClusterStateUpdateTask {
private final String index;
private final ActionListener<AcknowledgedResponse> listener;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,40 +167,31 @@ public RemoteRestoreResult restore(
IndicesOptions.fromOptions(true, true, true, true)
);

boolean allSearchOnly = true;
for (String indexName : filteredIndices) {
IndexMetadata indexMetadata = currentState.metadata().index(indexName);
if (indexMetadata == null) {
logger.warn("Skipping restore: index [{}] does not exist.", indexName);
logger.warn("Index restore is not supported for non-existent index. Skipping: {}", indexName);
continue;

Check warning on line 174 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L174

Added line #L174 was not covered by tests
}

boolean isSearchOnly = indexMetadata.getSettings()
.getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false);

Check warning on line 177 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L176-L177

Added lines #L176 - L177 were not covered by tests

if (isSearchOnly) {
logger.warn("Skipping _remotestore/_restore for index [{}] as search-only mode is enabled.", indexName);
} else if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Cannot restore index [%s] because search-only mode is enabled", indexName)

Check warning on line 180 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L179-L180

Added lines #L179 - L180 were not covered by tests
);
}
if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"cannot restore index [%s] because an open index with same name/uuid already exists in the cluster.",
indexName
) + " Close the existing index."
);
} else {
allSearchOnly = false;
indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata));
}
}

if (allSearchOnly) {
throw new IllegalArgumentException(
"Skipping _remotestore/_restore for all selected indices as search-only mode is enabled."
);
indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata));

Check warning on line 192 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L192

Added line #L192 was not covered by tests
}
}

return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteState);
}

Expand Down
Loading

0 comments on commit 602854c

Please sign in to comment.