Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Search only replicas (scale to zero) with Reader/Writer Separation #17299

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ testClusters {
plugin('plugins:'.concat(p))
}
}
setting 'opensearch.experimental.feature.read.write.split.enabled', 'true'
setting 'path.repo', '["/tmp/my-repo"]'
setting 'cluster.routing.allocation.enable', 'all'
setting 'cluster.remote_store.state.enabled', 'true'
setting 'node.attr.remote_store.segment.repository', 'my-repository'
setting 'node.attr.remote_store.translog.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.type', 'fs'
setting 'node.attr.remote_store.state.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.settings.location', '/tmp/my-repo'
}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.settings.Settings;
Expand All @@ -21,7 +25,9 @@

import java.util.List;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -102,8 +108,8 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType

Settings settings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType)
.build();
Expand All @@ -114,13 +120,29 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType
ensureGreen(INDEX_NAME);
}

public void testRemoteStoreRestoreFailsForSearchOnlyIndex() throws Exception {
bootstrapIndexWithSearchReplicas();
assertAcked(client().admin().indices().prepareSearchOnly(INDEX_NAME).setSearchOnly(true).get());

GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings(INDEX_NAME).get();
assertEquals("true", settingsResponse.getSetting(INDEX_NAME, IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey()));

IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
PlainActionFuture<RestoreRemoteStoreResponse> future = PlainActionFuture.newFuture();
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), future);
future.actionGet();
});

assertTrue(exception.getMessage().contains("Cannot restore index [" + INDEX_NAME + "] because search-only mode is enabled"));
}

private void bootstrapIndexWithSearchReplicas() throws InterruptedException {
startCluster(3);

Settings settings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@
import org.opensearch.action.admin.indices.resolve.ResolveIndexAction;
import org.opensearch.action.admin.indices.rollover.RolloverAction;
import org.opensearch.action.admin.indices.rollover.TransportRolloverAction;
import org.opensearch.action.admin.indices.scale.searchonly.SearchOnlyScaleAction;
import org.opensearch.action.admin.indices.scale.searchonly.TransportSearchOnlyAction;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
Expand Down Expand Up @@ -419,6 +421,7 @@
import org.opensearch.rest.action.admin.indices.RestResizeHandler;
import org.opensearch.rest.action.admin.indices.RestResolveIndexAction;
import org.opensearch.rest.action.admin.indices.RestRolloverIndexAction;
import org.opensearch.rest.action.admin.indices.RestSearchOnlyAction;
import org.opensearch.rest.action.admin.indices.RestSimulateIndexTemplateAction;
import org.opensearch.rest.action.admin.indices.RestSimulateTemplateAction;
import org.opensearch.rest.action.admin.indices.RestSyncedFlushAction;
Expand Down Expand Up @@ -685,6 +688,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(AutoPutMappingAction.INSTANCE, TransportAutoPutMappingAction.class);
actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
actions.register(SearchOnlyScaleAction.INSTANCE, TransportSearchOnlyAction.class);
actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
actions.register(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
actions.register(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
Expand Down Expand Up @@ -906,7 +910,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestUpgradeAction());
registerHandler.accept(new RestUpgradeStatusAction());
registerHandler.accept(new RestClearIndicesCacheAction());

registerHandler.accept(new RestSearchOnlyAction());
registerHandler.accept(new RestIndexAction());
registerHandler.accept(new CreateHandler());
registerHandler.accept(new AutoIdHandler(nodesInCluster));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale.searchonly;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.List;

/**
* A transport request sent to nodes to facilitate shard synchronization during search-only scaling operations.
* <p>
* This request is sent from the cluster manager to data nodes that host primary shards for the target index
* during scale operations. It contains the index name and a list of shard IDs that need to be synchronized
* before completing a scale-down operation.
* <p>
* When a node receives this request, it performs final sync and flush operations on the specified shards,
* ensuring all operations are committed and the remote store is synced. This is a crucial step in
* the scale-down process to ensure no data loss occurs when the index transitions to search-only mode.
*/
class NodeSearchOnlyRequest extends TransportRequest {
private final String index;
private final List<ShardId> shardIds;

/**
* Constructs a new NodeSearchOnlyRequest.
*
* @param index the name of the index being scaled
* @param shardIds the list of shard IDs to be synchronized on the target node
*/
NodeSearchOnlyRequest(String index, List<ShardId> shardIds) {
this.index = index;
this.shardIds = shardIds;
}

/**
* Deserialization constructor.
*
* @param in the stream input to read from
* @throws IOException if there is an I/O error during deserialization
*/
NodeSearchOnlyRequest(StreamInput in) throws IOException {
super(in);
this.index = in.readString();
this.shardIds = in.readList(ShardId::new);
}

/**
* Serializes this request to the given output stream.
*
* @param out the output stream to write to
* @throws IOException if there is an I/O error during serialization
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeList(shardIds);
}

/**
* Returns the index name associated with this request.
*
* @return the index name
*/
String getIndex() {
return index;
}

/**
* Returns the list of shard IDs to be synchronized.
*
* @return the list of shard IDs
*/
List<ShardId> getShardIds() {
return shardIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale.searchonly;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.transport.TransportResponse;

import java.io.IOException;
import java.util.List;

/**
* Response sent from nodes after processing a {@link NodeSearchOnlyRequest} during search-only scaling operations.
* <p>
* This response contains information about the node that processed the request and the results of
* synchronization attempts for each requested shard. The cluster manager uses these responses to
* determine whether it's safe to proceed with finalizing a scale-down operation.
* <p>
* Each response includes details about whether shards have any uncommitted operations or need
* additional synchronization, which would indicate the scale operation should be delayed until
* the cluster reaches a stable state.
*/
class NodeSearchOnlyResponse extends TransportResponse {
private final DiscoveryNode node;
private final List<ShardSearchOnlyResponse> shardResponses;

/**
* Constructs a new NodeSearchOnlyResponse.
*
* @param node the node that processed the synchronization request
* @param shardResponses the list of responses from individual shard synchronization attempts
*/
NodeSearchOnlyResponse(DiscoveryNode node, List<ShardSearchOnlyResponse> shardResponses) {
this.node = node;
this.shardResponses = shardResponses;
}

/**
* Deserialization constructor.
*
* @param in the stream input to read from
* @throws IOException if there is an I/O error during deserialization
*/
NodeSearchOnlyResponse(StreamInput in) throws IOException {
node = new DiscoveryNode(in);
shardResponses = in.readList(ShardSearchOnlyResponse::new);
}

/**
* Serializes this response to the given output stream.
*
* @param out the output stream to write to
* @throws IOException if there is an I/O error during serialization
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeList(shardResponses);
}

/**
* Returns the node that processed the synchronization request.
*
* @return the discovery node information
*/
public DiscoveryNode getNode() {
return node;
}

/**
* Returns the list of shard-level synchronization responses.
* <p>
* These responses contain critical information about the state of each shard,
* including whether there are uncommitted operations or if additional synchronization
* is needed before the scale operation can safely proceed.
*
* @return the list of shard responses
*/
public List<ShardSearchOnlyResponse> getShardResponses() {
return shardResponses;
}
}
Loading
Loading