diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyScaleIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyScaleIT.java index f34c3a7e7cd0a..1948c4c747345 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyScaleIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyScaleIT.java @@ -6,14 +6,6 @@ * compatible open source license. */ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - package org.opensearch.action.admin.indices.scale.searchonly; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; @@ -115,9 +107,7 @@ public void testScaleDownToSearchOnly() throws Exception { try { client().prepareIndex(TEST_INDEX).setId("new-doc").setSource("field1", "new-value").get(); fail("Expected ClusterBlockException"); - } catch (ClusterBlockException e) { - // Expected exception - } + } catch (ClusterBlockException ignored) {} // Verify routing table structure assertEquals(0, getClusterState().routingTable().index(TEST_INDEX).shard(0).writerReplicas().size()); @@ -125,7 +115,10 @@ public void testScaleDownToSearchOnly() throws Exception { 1, getClusterState().routingTable().index(TEST_INDEX).shard(0).searchOnlyReplicas().stream().filter(ShardRouting::active).count() ); - assertNull(shardTable.primaryShard()); + assertBusy(() -> { + IndexShardRoutingTable currentShardTable = getClusterState().routingTable().index(TEST_INDEX).shard(0); + assertNull("Primary shard should be null after scale-down", currentShardTable.primaryShard()); + }); } @@ -144,11 +137,12 @@ public void testScaleUpFromSearchOnly() throws Exception { ensureGreen(TEST_INDEX); for (int i = 0; i < 5; i++) { - client().prepareIndex(TEST_INDEX) + IndexResponse indexResponse = client().prepareIndex(TEST_INDEX) .setId(Integer.toString(i)) .setSource("field1", "value" + i) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .get(); + assertEquals(RestStatus.CREATED, indexResponse.status()); } // Verify initial state has expected replica counts @@ -203,7 +197,7 @@ public void testScaleUpFromSearchOnly() throws Exception { // Verify we can search existing data SearchResponse searchResponse = client().prepareSearch(TEST_INDEX).get(); - assertHitCount(searchResponse, 3); + assertHitCount(searchResponse, 5); // Verify we can write to the index again IndexResponse indexResponse = client().prepareIndex(TEST_INDEX) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java index e8d65e07c7dd9..5dcca0540f926 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java @@ -8,7 +8,11 @@ package org.opensearch.indices.replication; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.settings.Settings; @@ -21,7 +25,9 @@ import java.util.List; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -102,8 +108,8 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType Settings settings = Settings.builder() .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 1) .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0) .put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType) .build(); @@ -114,13 +120,29 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType ensureGreen(INDEX_NAME); } + public void testRemoteStoreRestoreFailsForSearchOnlyIndex() throws Exception { + bootstrapIndexWithSearchReplicas(); + assertAcked(client().admin().indices().prepareSearchOnly(INDEX_NAME).setScaleDown(true).get()); + + GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings(INDEX_NAME).get(); + assertEquals("true", settingsResponse.getSetting(INDEX_NAME, IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey())); + + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { + PlainActionFuture future = PlainActionFuture.newFuture(); + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), future); + future.actionGet(); + }); + + assertTrue(exception.getMessage().contains("Cannot restore index [" + INDEX_NAME + "] because search-only mode is enabled")); + } + private void bootstrapIndexWithSearchReplicas() throws InterruptedException { startCluster(3); Settings settings = Settings.builder() .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 1) .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/ScaleOperationValidator.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyOperationValidator.java similarity index 99% rename from server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/ScaleOperationValidator.java rename to server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyOperationValidator.java index cd89c6f246419..4362ba608ab69 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/ScaleOperationValidator.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyOperationValidator.java @@ -21,7 +21,7 @@ * index state compatibility, and configuration prerequisites such as remote store * and segment replication settings. */ -class ScaleOperationValidator { +class SearchOnlyOperationValidator { /** * Validates that the given index meets the prerequisites for the scale operation. diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/TransportSearchOnlyAction.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/TransportSearchOnlyAction.java index 5318533e098df..74bd86d62f794 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/TransportSearchOnlyAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/TransportSearchOnlyAction.java @@ -76,7 +76,7 @@ public class TransportSearchOnlyAction extends TransportClusterManagerNodeAction private final IndicesService indicesService; private final TransportService transportService; - private final ScaleOperationValidator validator; + private final SearchOnlyOperationValidator validator; private final SearchOnlyClusterStateBuilder searchOnlyClusterStateBuilder; private final SearchOnlyShardSyncManager searchOnlyShardSyncManager; @@ -131,7 +131,7 @@ public TransportSearchOnlyAction( this.allocationService = allocationService; this.indicesService = indicesService; this.transportService = transportService; - this.validator = new ScaleOperationValidator(); + this.validator = new SearchOnlyOperationValidator(); this.searchOnlyClusterStateBuilder = new SearchOnlyClusterStateBuilder(); this.searchOnlyShardSyncManager = new SearchOnlyShardSyncManager(clusterService, transportService, NAME); diff --git a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java index ddc695a3cac4b..752203a7bd19b 100644 --- a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java +++ b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java @@ -167,20 +167,20 @@ public RemoteRestoreResult restore( IndicesOptions.fromOptions(true, true, true, true) ); - boolean allSearchOnly = true; for (String indexName : filteredIndices) { IndexMetadata indexMetadata = currentState.metadata().index(indexName); if (indexMetadata == null) { - logger.warn("Skipping restore: index [{}] does not exist.", indexName); + logger.warn("Index restore is not supported for non-existent index. Skipping: {}", indexName); continue; } - boolean isSearchOnly = indexMetadata.getSettings() .getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false); - if (isSearchOnly) { - logger.warn("Skipping _remotestore/_restore for index [{}] as search-only mode is enabled.", indexName); - } else if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Cannot restore index [%s] because search-only mode is enabled", indexName) + ); + } + if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) { throw new IllegalStateException( String.format( Locale.ROOT, @@ -188,19 +188,10 @@ public RemoteRestoreResult restore( indexName ) + " Close the existing index." ); - } else { - allSearchOnly = false; - indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata)); } - } - - if (allSearchOnly) { - throw new IllegalArgumentException( - "Skipping _remotestore/_restore for all selected indices as search-only mode is enabled." - ); + indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata)); } } - return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteState); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index d679240955a07..35e30e37846fa 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -1185,9 +1185,9 @@ public void removeIndex(final Index index, final IndexRemovalReason reason, fina } listener.beforeIndexRemoved(indexService, reason); - logger.debug("{} closing index service (reason [{}][{}])", index, reason, extraInfo); + logger.info("{} closing index service (reason [{}][{}])", index, reason, extraInfo); indexService.close(extraInfo, reason == IndexRemovalReason.DELETED); - logger.debug("{} closed... (reason [{}][{}])", index, reason, extraInfo); + logger.info("{} closed... (reason [{}][{}])", index, reason, extraInfo); final IndexSettings indexSettings = indexService.getIndexSettings(); listener.afterIndexRemoved(indexService.index(), indexSettings, reason); if (reason == IndexRemovalReason.DELETED) { diff --git a/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyClusterStateBuilderTests.java b/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyClusterStateBuilderTests.java new file mode 100644 index 0000000000000..1c4bd1bb451f5 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyClusterStateBuilderTests.java @@ -0,0 +1,168 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.scale.searchonly; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.action.admin.indices.scale.searchonly.TransportSearchOnlyAction.INDEX_SEARCHONLY_BLOCK; +import static org.opensearch.action.admin.indices.scale.searchonly.TransportSearchOnlyAction.INDEX_SEARCHONLY_BLOCK_ID; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; +import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; + +public class SearchOnlyClusterStateBuilderTests extends OpenSearchTestCase { + + private SearchOnlyClusterStateBuilder builder; + private ClusterState initialState; + private String testIndex; + private IndexMetadata indexMetadata; + + @Override + public void setUp() throws Exception { + super.setUp(); + builder = new SearchOnlyClusterStateBuilder(); + testIndex = "test_index"; + + // Create basic index metadata with segment replication enabled + Settings indexSettings = Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_INDEX_UUID, randomAlphaOfLength(8)) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) // Add search replicas + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) // Enable segment replication + .put(SETTING_REMOTE_STORE_ENABLED, true) // Enable remote store + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + .build(); + + indexMetadata = IndexMetadata.builder(testIndex).settings(indexSettings).build(); + + // Create initial cluster state with routing table + Metadata metadata = Metadata.builder().put(indexMetadata, true).build(); + + initialState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metadata(metadata) + .routingTable(RoutingTable.builder().addAsNew(indexMetadata).build()) + .build(); + } + + public void testBuildScaleDownState() { + Map blockedIndices = new HashMap<>(); + + // Execute scale down state build + ClusterState newState = builder.buildScaleDownState(initialState, testIndex, blockedIndices); + + // Verify block was added + assertTrue("Scale down block should be present", newState.blocks().hasIndexBlockWithId(testIndex, INDEX_SEARCHONLY_BLOCK_ID)); + + // Verify blocked indices map was updated + assertFalse("Blocked indices map should not be empty", blockedIndices.isEmpty()); + assertEquals("Should have one blocked index", 1, blockedIndices.size()); + assertTrue("Index should be in blocked indices map", blockedIndices.containsKey(indexMetadata.getIndex())); + } + + public void testBuildFinalScaleDownState() { + Map blockedIndices = new HashMap<>(); + ClusterState stateWithBlock = builder.buildScaleDownState(initialState, testIndex, blockedIndices); + + ClusterState finalState = builder.buildFinalScaleDownState(stateWithBlock, testIndex); + + // Verify blocks + assertFalse( + "Temporary block should be removed", + finalState.blocks().hasIndexBlock(testIndex, blockedIndices.get(indexMetadata.getIndex())) + ); + assertTrue("Search-only block should be present", finalState.blocks().hasIndexBlock(testIndex, INDEX_SEARCHONLY_BLOCK)); + + // Verify metadata was updated + IndexMetadata updatedMetadata = finalState.metadata().index(testIndex); + assertTrue( + "Index should be marked as search-only", + updatedMetadata.getSettings().getAsBoolean(INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false) + ); + } + + public void testBuildScaleUpRoutingTable() { + // Prepare a proper search-only state + Settings scaleUpSettings = Settings.builder() + .put(indexMetadata.getSettings()) + .put(INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), true) + .build(); + + IndexMetadata searchOnlyMetadata = IndexMetadata.builder(indexMetadata).settings(scaleUpSettings).build(); + + // Create search-only shard routing + ShardRouting searchOnlyShard = ShardRouting.newUnassigned( + new ShardId(searchOnlyMetadata.getIndex(), 0), + false, // not primary + true, // search only + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test") + ); + + // Build routing table with search-only shard + IndexRoutingTable.Builder routingTableBuilder = new IndexRoutingTable.Builder(searchOnlyMetadata.getIndex()).addShard( + searchOnlyShard + ); + + ClusterState searchOnlyState = ClusterState.builder(initialState) + .metadata(Metadata.builder(initialState.metadata()).put(searchOnlyMetadata, true)) + .routingTable(RoutingTable.builder().add(routingTableBuilder.build()).build()) + .build(); + + // Execute scale up + RoutingTable newRoutingTable = builder.buildScaleUpRoutingTable(searchOnlyState, testIndex); + + // Verify routing table + IndexRoutingTable indexRoutingTable = newRoutingTable.index(testIndex); + assertNotNull("Index routing table should exist", indexRoutingTable); + + // Verify primary shard was added + boolean hasPrimary = indexRoutingTable.shardsWithState(UNASSIGNED).stream().anyMatch(ShardRouting::primary); + assertTrue("Should have an unassigned primary shard", hasPrimary); + + // Verify regular replicas were added (excluding search replicas) + long replicaCount = indexRoutingTable.shardsWithState(UNASSIGNED) + .stream() + .filter(shard -> !shard.primary() && !shard.isSearchOnly()) + .count(); + assertEquals("Should have correct number of replica shards", indexMetadata.getNumberOfReplicas(), replicaCount); + + // Verify search replicas were preserved + long searchReplicaCount = indexRoutingTable.shardsWithState(UNASSIGNED).stream().filter(ShardRouting::isSearchOnly).count(); + assertEquals("Should preserve search replicas", indexMetadata.getNumberOfSearchOnlyReplicas(), searchReplicaCount); + } + + public void testBuildFinalScaleDownStateWithInvalidIndex() { + expectThrows(IllegalStateException.class, () -> builder.buildFinalScaleDownState(initialState, "nonexistent_index")); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyRequestTests.java new file mode 100644 index 0000000000000..8f522d37b8493 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyRequestTests.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.scale.searchonly; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class SearchOnlyRequestTests extends OpenSearchTestCase { + + public void testSerialization() throws IOException { + SearchOnlyRequest request = new SearchOnlyRequest("test_index", true); + + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + + StreamInput in = out.bytes().streamInput(); + SearchOnlyRequest deserializedRequest = new SearchOnlyRequest(in); + + assertEquals(request.getIndex(), deserializedRequest.getIndex()); + assertEquals(request.isScaleDown(), deserializedRequest.isScaleDown()); + } + + public void testValidation() { + SearchOnlyRequest request = new SearchOnlyRequest(null, true); + assertNotNull(request.validate()); + + request = new SearchOnlyRequest("", true); + assertNotNull(request.validate()); + + request = new SearchOnlyRequest(" ", true); + assertNotNull(request.validate()); + + request = new SearchOnlyRequest("test_index", true); + assertNull(request.validate()); + } + + public void testEquals() { + SearchOnlyRequest request1 = new SearchOnlyRequest("test_index", true); + SearchOnlyRequest request2 = new SearchOnlyRequest("test_index", true); + SearchOnlyRequest request3 = new SearchOnlyRequest("other_index", true); + SearchOnlyRequest request4 = new SearchOnlyRequest("test_index", false); + + assertEquals(request1, request2); + assertNotEquals(request1, request3); + assertNotEquals(request1, request4); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyShardSyncManagerTests.java b/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyShardSyncManagerTests.java new file mode 100644 index 0000000000000..bd4bfad3b5d9a --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyShardSyncManagerTests.java @@ -0,0 +1,347 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.scale.searchonly; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SearchOnlyShardSyncManagerTests extends OpenSearchTestCase { + + private ClusterService clusterService; + private TransportService transportService; + private SearchOnlyShardSyncManager syncManager; + private final String transportActionName = "dummyAction"; + + @Override + public void setUp() throws Exception { + super.setUp(); + clusterService = mock(ClusterService.class); + transportService = mock(TransportService.class); + syncManager = new SearchOnlyShardSyncManager(clusterService, transportService, transportActionName); + } + + // --- Tests for sendShardSyncRequests --- + + public void testSendShardSyncRequests_emptyPrimaryShards() { + ActionListener> listener = new ActionListener<>() { + @Override + public void onResponse(Collection responses) { + fail("Expected failure when primary shards map is empty"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof IllegalStateException); + assertEquals("No primary shards found for index test_index", e.getMessage()); + } + }; + syncManager.sendShardSyncRequests("test_index", Collections.emptyMap(), listener); + } + + public void testSendShardSyncRequests_nodeNotFound() { + // Prepare a mapping: one shard assigned to node "node1" + ShardId shardId = new ShardId(new Index("test_index", "uuid"), 0); + Map primaryShardsNodes = Collections.singletonMap(shardId, "node1"); + + // Set cluster state with empty discovery nodes so "node1" is missing. + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build(); + when(clusterService.state()).thenReturn(clusterState); + + ActionListener> listener = new ActionListener<>() { + @Override + public void onResponse(Collection responses) { + fail("Expected failure due to missing node"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e.getMessage().contains("Node [node1] not found")); + } + }; + + syncManager.sendShardSyncRequests("test_index", primaryShardsNodes, listener); + } + + public void testSendShardSyncRequests_success() throws Exception { + // Prepare a mapping: one shard assigned to node "node1" + ShardId shardId = new ShardId(new Index("test_index", "uuid"), 0); + Map primaryShardsNodes = Collections.singletonMap(shardId, "node1"); + + // Build cluster state with discovery node "node1" + DiscoveryNode node = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(node).localNodeId("node1").build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build(); + when(clusterService.state()).thenReturn(clusterState); + + // Stub transportService.sendRequest to return a dummy response. + doAnswer(invocation -> { + TransportResponseHandler handler = invocation.getArgument(3); + handler.handleResponse(new NodeSearchOnlyResponse(node, Collections.emptyList())); + return null; + }).when(transportService) + .sendRequest( + any(DiscoveryNode.class), + eq(transportActionName), + any(NodeSearchOnlyRequest.class), + any(TransportResponseHandler.class) + ); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference> responseRef = new AtomicReference<>(); + AtomicReference exceptionRef = new AtomicReference<>(); + + ActionListener> listener = new ActionListener<>() { + @Override + public void onResponse(Collection responses) { + responseRef.set(responses); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }; + + syncManager.sendShardSyncRequests("test_index", primaryShardsNodes, listener); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(exceptionRef.get()); + Collection responses = responseRef.get(); + assertNotNull(responses); + // We expect one response since there's one node. + assertEquals(1, responses.size()); + } + + // --- Tests for sendNodeRequest --- + + public void testSendNodeRequest_success() throws Exception { + DiscoveryNode node = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); + String index = "test_index"; + List shards = Collections.singletonList(new ShardId(new Index("test_index", "uuid"), 0)); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); + + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(NodeSearchOnlyResponse response) { + responseRef.set(response); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("Unexpected failure: " + e.getMessage()); + } + }; + + doAnswer(invocation -> { + TransportResponseHandler handler = invocation.getArgument(3); + handler.handleResponse(new NodeSearchOnlyResponse(node, Collections.emptyList())); + return null; + }).when(transportService) + .sendRequest(eq(node), eq(transportActionName), any(NodeSearchOnlyRequest.class), any(TransportResponseHandler.class)); + + syncManager.sendNodeRequest(node, index, shards, listener); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNotNull(responseRef.get()); + } + + public void testSendNodeRequest_failure() throws Exception { + DiscoveryNode node = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); + String index = "test_index"; + List shards = Collections.singletonList(new ShardId(new Index("test_index", "uuid"), 0)); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionRef = new AtomicReference<>(); + + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(NodeSearchOnlyResponse response) { + fail("Expected failure"); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }; + + // Use a dummy Throwable as the cause instead of passing the node. + doAnswer(invocation -> { + TransportResponseHandler handler = invocation.getArgument(3); + handler.handleException(new TransportException("Test exception", new Exception("dummy cause"))); + return null; + }).when(transportService) + .sendRequest(eq(node), eq(transportActionName), any(NodeSearchOnlyRequest.class), any(TransportResponseHandler.class)); + + syncManager.sendNodeRequest(node, index, shards, listener); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNotNull(exceptionRef.get()); + assertTrue(exceptionRef.get() instanceof TransportException); + } + + // --- Tests for validateNodeResponses --- + + public void testValidateNodeResponses_success() { + // Create a shard response with no failures. + ShardId shardId = new ShardId(new Index("test_index", "uuid"), 0); + ShardSearchOnlyResponse shardResponse = new ShardSearchOnlyResponse(shardId, false, 0); + NodeSearchOnlyResponse nodeResponse = new NodeSearchOnlyResponse( + new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT), + Collections.singletonList(shardResponse) + ); + + List responses = Collections.singletonList(nodeResponse); + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference exceptionRef = new AtomicReference<>(); + + syncManager.validateNodeResponses(responses, new ActionListener() { + @Override + public void onResponse(SearchOnlyResponse response) { + responseRef.set(response); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }); + + assertNull(exceptionRef.get()); + assertNotNull(responseRef.get()); + } + + public void testValidateNodeResponses_failure_uncommitted() { + // Create a shard response indicating uncommitted operations. + ShardId shardId = new ShardId(new Index("test_index", "uuid"), 0); + ShardSearchOnlyResponse shardResponse = new ShardSearchOnlyResponse(shardId, false, 5); + NodeSearchOnlyResponse nodeResponse = new NodeSearchOnlyResponse( + new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT), + Collections.singletonList(shardResponse) + ); + + List responses = Collections.singletonList(nodeResponse); + AtomicReference exceptionRef = new AtomicReference<>(); + + syncManager.validateNodeResponses(responses, new ActionListener() { + @Override + public void onResponse(SearchOnlyResponse response) { + fail("Expected failure due to uncommitted operations"); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }); + + assertNotNull(exceptionRef.get()); + assertTrue(exceptionRef.get().getMessage().contains("uncommitted operations")); + } + + public void testValidateNodeResponses_failure_needsSync() { + // Create a shard response indicating that a shard needs sync. + ShardId shardId = new ShardId(new Index("test_index", "uuid"), 0); + ShardSearchOnlyResponse shardResponse = new ShardSearchOnlyResponse(shardId, true, 0); + NodeSearchOnlyResponse nodeResponse = new NodeSearchOnlyResponse( + new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT), + Collections.singletonList(shardResponse) + ); + + List responses = Collections.singletonList(nodeResponse); + AtomicReference exceptionRef = new AtomicReference<>(); + + syncManager.validateNodeResponses(responses, new ActionListener() { + @Override + public void onResponse(SearchOnlyResponse response) { + fail("Expected failure due to sync needed"); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }); + + assertNotNull(exceptionRef.get()); + assertTrue(exceptionRef.get().getMessage().contains("sync needed")); + } + + // --- Tests for getPrimaryShardAssignments --- + public void testGetPrimaryShardAssignments_withRouting() { + // Create index settings with an explicit uuid. + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.uuid", "uuid") + .build(); + // Build IndexMetadata using the index name. The builder will pick up the uuid from the settings. + IndexMetadata indexMetadata = IndexMetadata.builder("test_index") + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + + // Build a minimal routing table using the same index name and uuid. + Index index = new Index("test_index", "uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting primaryShardRouting = TestShardRouting.newShardRouting(shardId, "node1", true, ShardRoutingState.STARTED); + IndexShardRoutingTable shardRoutingTable = new IndexShardRoutingTable.Builder(shardId).addShard(primaryShardRouting).build(); + IndexRoutingTable indexRoutingTable = new IndexRoutingTable.Builder(index).addIndexShard(shardRoutingTable).build(); + RoutingTable routingTable = RoutingTable.builder().add(indexRoutingTable).build(); + + // Build a cluster state that contains the routing table. + ClusterState state = ClusterState.builder(new ClusterName("test")).routingTable(routingTable).build(); + + // Invoke the method under test. + Map assignments = syncManager.getPrimaryShardAssignments(indexMetadata, state); + // We expect one mapping: shard0 -> "node1" + assertEquals(1, assignments.size()); + // Construct the expected shard id using the same Index (name and uuid). + ShardId expectedShardId = new ShardId(new Index("test_index", "uuid"), 0); + assertEquals("node1", assignments.get(expectedShardId)); + } + +} diff --git a/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/TransportSearchOnlyActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/TransportSearchOnlyActionTests.java new file mode 100644 index 0000000000000..5c202ebbd4d1f --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/scale/searchonly/TransportSearchOnlyActionTests.java @@ -0,0 +1,451 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.scale.searchonly; + +import org.opensearch.Version; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.AcknowledgedResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.block.ClusterBlock; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.concurrent.TimeUnit; + +import org.mockito.ArgumentCaptor; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TransportSearchOnlyActionTests extends OpenSearchTestCase { + + private TransportService transportService; + private ClusterService clusterService; + private AllocationService allocationService; + private IndicesService indicesService; + private ThreadPool threadPool; + private TransportSearchOnlyAction action; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("SearchOnlyActionTests"); + transportService = mock(TransportService.class); + clusterService = mock(ClusterService.class); + allocationService = mock(AllocationService.class); + indicesService = mock(IndicesService.class); + + action = new TransportSearchOnlyAction( + transportService, + clusterService, + threadPool, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), + allocationService, + indicesService + ); + + // Setup basic cluster state + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName("test")); + stateBuilder.nodes(DiscoveryNodes.builder().build()); + when(clusterService.state()).thenReturn(stateBuilder.build()); + } + + @After + public void tearDown() throws Exception { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + super.tearDown(); + } + + public void testScaleDownValidation() { + String indexName = "test_index"; + SearchOnlyRequest request = new SearchOnlyRequest(indexName, true); + + // Test validation when index doesn't exist + ClusterState state = createClusterStateWithoutIndex(indexName); + when(clusterService.state()).thenReturn(state); + + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + fail("Expected validation to fail"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof IllegalArgumentException); + assertEquals("Index [" + indexName + "] not found", e.getMessage()); + } + }; + + action.clusterManagerOperation(request, state, listener); + } + + public void testScaleDownWithSearchOnlyAlreadyEnabled() { + String indexName = "test_index"; + SearchOnlyRequest request = new SearchOnlyRequest(indexName, true); + + // Create cluster state with search-only already enabled + ClusterState state = createClusterStateWithSearchOnlyEnabled(indexName); + when(clusterService.state()).thenReturn(state); + + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + fail("Expected validation to fail"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof IllegalStateException); + assertEquals("Index [" + indexName + "] is already in search-only mode", e.getMessage()); + } + }; + + action.clusterManagerOperation(request, state, listener); + } + + public void testScaleUpValidation() { + String indexName = "test_index"; + SearchOnlyRequest request = new SearchOnlyRequest(indexName, false); + + // Test validation when index is not in search-only mode + ClusterState state = createClusterStateWithoutSearchOnly(indexName); + when(clusterService.state()).thenReturn(state); + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse response) { + fail("Expected validation to fail"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof IllegalStateException); + assertEquals("Index [" + indexName + "] is not in search-only mode", e.getMessage()); + } + }; + + action.clusterManagerOperation(request, state, listener); + } + + private ClusterState createClusterStateWithoutIndex(String indexName) { + return ClusterState.builder(new ClusterName("test")).metadata(Metadata.builder().build()).build(); + } + + private ClusterState createClusterStateWithSearchOnlyEnabled(String indexName) { + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), true) + .build(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + + return ClusterState.builder(new ClusterName("test")).metadata(Metadata.builder().put(indexMetadata, true).build()).build(); + } + + private ClusterState createClusterStateWithoutSearchOnly(String indexName) { + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false) + .build(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + + return ClusterState.builder(new ClusterName("test")).metadata(Metadata.builder().put(indexMetadata, true).build()).build(); + } + + public void testAddBlockClusterStateUpdateTask() { + String indexName = "test_index"; + SearchOnlyRequest request = new SearchOnlyRequest(indexName, true); + + // Create initial cluster state with necessary index metadata + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + + ClusterState initialState = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().put(indexMetadata, true).build()) + .build(); + + when(clusterService.state()).thenReturn(initialState); + + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + assertTrue("Expected block to be added successfully", response.isAcknowledged()); + } + + @Override + public void onFailure(Exception e) { + fail("Should not fail: " + e.getMessage()); + } + }; + + action.clusterManagerOperation(request, initialState, listener); + + // Verify that the appropriate block was added + verify(clusterService).submitStateUpdateTask(eq("add-block-index-to-scale " + indexName), any()); + } + + public void testFinalizeScaleDownTaskSimple() throws Exception { + String indexName = "test_index"; + SearchOnlyRequest request = new SearchOnlyRequest(indexName, true); + + // Create minimal index metadata that meets scale-down prerequisites. + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build(); + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + + // Build a minimal routing table for the index. + Index index = indexMetadata.getIndex(); + ShardId shardId = new ShardId(index, 0); + ShardRouting primaryShardRouting = TestShardRouting.newShardRouting(shardId, "node1", true, ShardRoutingState.STARTED); + IndexShardRoutingTable shardRoutingTable = new IndexShardRoutingTable.Builder(shardId).addShard(primaryShardRouting).build(); + IndexRoutingTable indexRoutingTable = new IndexRoutingTable.Builder(index).addIndexShard(shardRoutingTable).build(); + RoutingTable routingTable = RoutingTable.builder().add(indexRoutingTable).build(); + + // Create a DiscoveryNode and include it in the cluster state's nodes. + DiscoveryNode node = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(node).localNodeId("node1").build(); + + // Build the complete cluster state with metadata, routing table, and nodes. + ClusterState initialState = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().put(indexMetadata, true).build()) + .routingTable(routingTable) + .nodes(discoveryNodes) + .build(); + when(clusterService.state()).thenReturn(initialState); + + // Stub transportService.sendRequest so that any shard sync request immediately succeeds. + doAnswer(invocation -> { + TransportResponseHandler handler = invocation.getArgument(3); + handler.handleResponse(new NodeSearchOnlyResponse(node, Collections.emptyList())); + return null; + }).when(transportService) + .sendRequest( + any(DiscoveryNode.class), + eq(TransportSearchOnlyAction.NAME), + any(NodeSearchOnlyRequest.class), + any(TransportResponseHandler.class) + ); + + // Execute the scale-down operation. + action.clusterManagerOperation(request, initialState, new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) {} + + @Override + public void onFailure(Exception e) { + fail("Operation should not fail: " + e.getMessage()); + } + }); + + // Capture the add-block task submitted by the action. + ArgumentCaptor captor = ArgumentCaptor.forClass(ClusterStateUpdateTask.class); + verify(clusterService).submitStateUpdateTask(eq("add-block-index-to-scale " + indexName), captor.capture()); + ClusterStateUpdateTask addBlockTask = captor.getValue(); + + // Create a new cluster state that is different from initialState. + // For example, add a dummy block to the index. + ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder().blocks(initialState.blocks()); + blocksBuilder.addIndexBlock( + indexName, + new ClusterBlock(123, "dummy", false, false, false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.WRITE)) + ); + ClusterState newState = ClusterState.builder(initialState).blocks(blocksBuilder).build(); + + // Simulate the add-block task callback (with a changed state) to trigger finalize. + addBlockTask.clusterStateProcessed("test-source", initialState, newState); + + // Verify that the finalize-scale-down update task was submitted. + verify(clusterService).submitStateUpdateTask(eq("finalize-scale-down"), any(ClusterStateUpdateTask.class)); + } + + public void testScaleUpClusterStateUpdateTask() throws Exception { + String indexName = "test_index"; + SearchOnlyRequest request = new SearchOnlyRequest(indexName, false); + + // Create index metadata with search-only mode enabled. + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), true) + .build(); + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + + // Build a minimal routing table for the index. + Index index = indexMetadata.getIndex(); + ShardId shardId = new ShardId(index, 0); + // Create a dummy shard routing in STARTED state. + ShardRouting searchOnlyShardRouting = TestShardRouting.newShardRouting(shardId, "node1", true, ShardRoutingState.STARTED); + IndexShardRoutingTable shardRoutingTable = new IndexShardRoutingTable.Builder(shardId).addShard(searchOnlyShardRouting).build(); + IndexRoutingTable indexRoutingTable = new IndexRoutingTable.Builder(index).addIndexShard(shardRoutingTable).build(); + RoutingTable routingTable = RoutingTable.builder().add(indexRoutingTable).build(); + + // Build the complete cluster state with metadata and the routing table. + ClusterState initialState = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().put(indexMetadata, true).build()) + .routingTable(routingTable) + .build(); + + // Stub allocationService.reroute to return a valid state. + ClusterState stateAfterReroute = ClusterState.builder(initialState).build(); + when(allocationService.reroute(any(ClusterState.class), anyString())).thenReturn(stateAfterReroute); + when(clusterService.state()).thenReturn(initialState); + + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + assertTrue("Expected scale up to complete successfully", response.isAcknowledged()); + } + + @Override + public void onFailure(Exception e) { + fail("Should not fail: " + e.getMessage()); + } + }; + + // Trigger the scale-up operation. + action.clusterManagerOperation(request, initialState, listener); + + // Capture the update task submitted for scaling up. + ArgumentCaptor captor = ArgumentCaptor.forClass(ClusterStateUpdateTask.class); + verify(clusterService).submitStateUpdateTask(eq("scale-up-index"), captor.capture()); + ClusterStateUpdateTask scaleUpTask = captor.getValue(); + + // Manually simulate execution of the scale-up task. + ClusterState updatedState = scaleUpTask.execute(initialState); + scaleUpTask.clusterStateProcessed("test-source", initialState, updatedState); + + // Verify that allocationService.reroute was called with the expected reason. + verify(allocationService).reroute(any(ClusterState.class), eq("restore indexing shards")); + } + + public void testScaleDownWithMissingIndex() { + String indexName = "non_existent_index"; + SearchOnlyRequest request = new SearchOnlyRequest(indexName, true); + + ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(Metadata.builder().build()).build(); + + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + fail("Should fail for missing index"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof IllegalArgumentException); + assertEquals("Index [" + indexName + "] not found", e.getMessage()); + } + }; + + action.clusterManagerOperation(request, state, listener); + } + + public void testScaleUpWithSearchOnlyNotEnabled() { + String indexName = "test_index"; + SearchOnlyRequest request = new SearchOnlyRequest(indexName, false); + + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false) + .build(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + + ClusterState state = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().put(indexMetadata, true).build()) + .build(); + + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + fail("Should fail when search-only is not enabled"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof IllegalStateException); + assertEquals("Index [" + indexName + "] is not in search-only mode", e.getMessage()); + } + }; + action.clusterManagerOperation(request, state, listener); + } +}