Skip to content

Commit

Permalink
[2.x] Adding new stop-replication action in ism
Browse files Browse the repository at this point in the history
Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>
  • Loading branch information
aggarwalShivani committed Feb 10, 2025
1 parent efb8cb6 commit af906fc
Show file tree
Hide file tree
Showing 12 changed files with 505 additions and 3 deletions.
8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ buildscript {
job_scheduler_no_snapshot = opensearch_build
notifications_no_snapshot = opensearch_build
security_no_snapshot = opensearch_build
ccr_no_snapshot = opensearch_build
if (buildVersionQualifier) {
opensearch_build += "-${buildVersionQualifier}"
job_scheduler_no_snapshot += "-${buildVersionQualifier}"
Expand Down Expand Up @@ -66,6 +67,10 @@ buildscript {
kotlin_version = System.getProperty("kotlin.version", "1.8.21")

security_plugin_version = System.getProperty("security.version", opensearch_build)
ccr_version = System.getProperty("ccr.version", opensearch_build)
ccr_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +
'/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-cross-cluster-replication-' + ccr_no_snapshot + '.zip'
ccr_resource_folder = "src/test/resources/replication"
}

repositories {
Expand Down Expand Up @@ -230,6 +235,7 @@ dependencies {
opensearchPlugin "org.opensearch.plugin:opensearch-notifications-core:${notifications_version}@zip"
opensearchPlugin "org.opensearch.plugin:notifications:${notifications_version}@zip"
opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip"
opensearchPlugin "org.opensearch.plugin:opensearch-cross-cluster-replication:${ccr_version}@zip"
}

repositories {
Expand Down Expand Up @@ -313,6 +319,7 @@ def jobSchedulerFile = resolvePluginFile("opensearch-job-scheduler")
def notificationsCoreFile = resolvePluginFile("opensearch-notifications-core")
def notificationsFile = resolvePluginFile("notifications")
def securityPluginFile = resolvePluginFile("opensearch-security")
def ccrFile = resolvePluginFile("opensearch-security")

ext.getPluginResource = { download_to_folder, download_from_src ->
def src_split = download_from_src.split("/")
Expand Down Expand Up @@ -393,6 +400,7 @@ testClusters.integTest {
if (securityEnabled) {
plugin(provider(securityPluginFile))
}
plugin(provider(ccrFile))
setting 'path.repo', repo.absolutePath
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.StopReplicationActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
Expand Down Expand Up @@ -51,6 +52,7 @@ class ISMActionsParser private constructor() {
RolloverActionParser(),
ShrinkActionParser(),
SnapshotActionParser(),
StopReplicationActionParser(),
TransformActionParser(),
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication.AttemptStopReplicationStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext

/**
* ISM action to stop replication on indices replicated on a follower cluster.
*/
class StopReplicationAction(
index: Int,
) : Action(name, index) {
companion object {
const val name = "stop_replication"
}

private val attemptStopReplicationStep = AttemptStopReplicationStep()

private val steps = listOf(attemptStopReplicationStep)

override fun getStepToExecute(context: StepContext): Step = attemptStopReplicationStep

override fun getSteps(): List<Step> = steps
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser

class StopReplicationActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
val index = sin.readInt()
return StopReplicationAction(index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp)

return StopReplicationAction(index)
}

override fun getActionType(): String = StopReplicationAction.name
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.commons.replication.ReplicationPluginInterface
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotInProgressException
import org.opensearch.transport.RemoteTransportException

class AttemptStopReplicationStep : Step(name) {
private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private var replicationPluginInterface: ReplicationPluginInterface = ReplicationPluginInterface()
fun setReplicationPluginInterface(replicationPluginInterface: ReplicationPluginInterface) {
this.replicationPluginInterface = replicationPluginInterface
}

override suspend fun execute(): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
try {
val stopIndexReplicationRequestObj = StopIndexReplicationRequest(indexName)
val response: AcknowledgedResponse = context.client.admin().indices().suspendUntil {
replicationPluginInterface.stopReplication(
context.client,
stopIndexReplicationRequestObj,
it,
)
}
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
} else {
val message = getFailedMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
}
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotInProgressException) {
handleSnapshotException(indexName, cause)
} else {
handleException(indexName, cause as Exception)
}
} catch (e: SnapshotInProgressException) {
handleSnapshotException(indexName, e)
} catch (e: Exception) {
handleException(indexName, e)
}
return this
}

private fun handleSnapshotException(indexName: String, e: SnapshotInProgressException) {
val message = getSnapshotMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
}

private fun handleException(indexName: String, e: Exception) {
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData = currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)

override fun isIdempotent() = false

companion object {
const val name = "attempt_stop_replication"

fun getFailedMessage(index: String) = "Failed to stop replication [index=$index]"

fun getSuccessMessage(index: String) = "Successfully stopped replication [index=$index]"

fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying stop replication [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ActionValidation(
"transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName)
"close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName)
"index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName)
"stop_replication" -> ValidateStopReplication(settings, clusterService, jvmService).execute(indexName)
// No validations for these actions at current stage.
// Reason: https://github.com/opensearch-project/index-management/issues/587
"notification" -> ValidateNothing(settings, clusterService, jvmService).execute(indexName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.validation

import org.apache.logging.log4j.LogManager
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate
import org.opensearch.indexmanagement.util.OpenForTesting
import org.opensearch.indices.InvalidIndexNameException
import org.opensearch.monitor.jvm.JvmService

@OpenForTesting
class ValidateStopReplication(
settings: Settings,
clusterService: ClusterService,
jvmService: JvmService,
) : Validate(settings, clusterService, jvmService) {
private val logger = LogManager.getLogger(javaClass)

@Suppress("ReturnSuppressCount", "ReturnCount")
override fun execute(indexName: String): Validate {
// if these conditions are false, fail validation and do not execute stop_replication action
if (!indexExists(indexName) || !validIndex(indexName)) {
validationStatus = ValidationStatus.FAILED
return this
}
validationMessage = getValidationPassedMessage(indexName)
return this
}

private fun indexExists(indexName: String): Boolean {
val isIndexExists = clusterService.state().metadata.indices.containsKey(indexName)
if (!isIndexExists) {
val message = getNoIndexMessage(indexName)
logger.warn(message)
validationMessage = message
return false
}
return true
}

private fun validIndex(indexName: String): Boolean {
val exceptionGenerator: (String, String) -> RuntimeException = { index_name, reason -> InvalidIndexNameException(index_name, reason) }
// If the index name is invalid for any reason, this will throw an exception giving the reason why in the message.
// That will be displayed to the user as the cause.
try {
MetadataCreateIndexService.validateIndexOrAliasName(indexName, exceptionGenerator)
} catch (e: Exception) {
val message = getIndexNotValidMessage(indexName)
logger.warn(message)
validationMessage = message
return false
}
return true
}

@Suppress("TooManyFunctions")
companion object {
const val name = "validate_stop_replication"

fun getNoIndexMessage(index: String) = "No such index [index=$index] for stop replication action."

fun getIndexNotValidMessage(index: String) = "Index [index=$index] is not valid. Abort stop replication action on it."

fun getValidationPassedMessage(index: String) = "Stop replication action validation passed for [index=$index]"
}
}
5 changes: 4 additions & 1 deletion src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 21
"schema_version": 22
},
"dynamic": "strict",
"properties": {
Expand Down Expand Up @@ -170,6 +170,9 @@
"delete": {
"type": "object"
},
"stop_replication": {
"type": "object"
},
"force_merge": {
"properties": {
"max_num_segments": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import javax.management.remote.JMXServiceURL

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

val configSchemaVersion = 21
val configSchemaVersion = 22
val historySchemaVersion = 7

// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
import org.opensearch.indexmanagement.waitFor
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale

class StopReplicationActionIT : IndexStateManagementRestTestCase() {
private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT)

fun `test stop_replication on a non-replicated index`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val actionConfig = StopReplicationAction(0)
val states =
listOf(
State("StopReplicationState", listOf(actionConfig), listOf()),
)

val policy =
Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states,
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)
val managedIndexConfig = getExistingManagedIndexConfig(indexName)
// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Need to wait two cycles.
// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val metadataInfo = getExplainManagedIndexMetaData(indexName).info.toString()
assertTrue(
metadataInfo.contains("cause=No replication in progress for index:" + indexName),
)
}
}
}
Loading

0 comments on commit af906fc

Please sign in to comment.