From af906fc75071410b5e2efb5f0e3c9308586051d5 Mon Sep 17 00:00:00 2001 From: aggarwalShivani Date: Mon, 10 Feb 2025 23:38:29 +0530 Subject: [PATCH] [2.x] Adding new stop-replication action in ism Signed-off-by: aggarwalShivani --- build.gradle | 8 + .../indexstatemanagement/ISMActionsParser.kt | 2 + .../action/StopReplicationAction.kt | 30 +++ .../action/StopReplicationActionParser.kt | 28 +++ .../AttemptStopReplicationStep.kt | 99 +++++++++ .../validation/ActionValidation.kt | 1 + .../validation/ValidateStopReplication.kt | 72 +++++++ .../mappings/opendistro-ism-config.json | 5 +- .../IndexManagementRestTestCase.kt | 2 +- .../action/StopReplicationActionIT.kt | 57 +++++ .../step/AttemptStopReplicationStepTests.kt | 199 ++++++++++++++++++ .../cached-opendistro-ism-config.json | 5 +- 12 files changed, 505 insertions(+), 3 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationAction.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionParser.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/stopreplication/AttemptStopReplicationStep.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateStopReplication.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionIT.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptStopReplicationStepTests.kt diff --git a/build.gradle b/build.gradle index 034edc133..c89bf51bb 100644 --- a/build.gradle +++ b/build.gradle @@ -35,6 +35,7 @@ buildscript { job_scheduler_no_snapshot = opensearch_build notifications_no_snapshot = opensearch_build security_no_snapshot = opensearch_build + ccr_no_snapshot = opensearch_build if (buildVersionQualifier) { opensearch_build += "-${buildVersionQualifier}" job_scheduler_no_snapshot += "-${buildVersionQualifier}" @@ -66,6 +67,10 @@ buildscript { kotlin_version = System.getProperty("kotlin.version", "1.8.21") security_plugin_version = System.getProperty("security.version", opensearch_build) + ccr_version = System.getProperty("ccr.version", opensearch_build) + ccr_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + + '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-cross-cluster-replication-' + ccr_no_snapshot + '.zip' + ccr_resource_folder = "src/test/resources/replication" } repositories { @@ -230,6 +235,7 @@ dependencies { opensearchPlugin "org.opensearch.plugin:opensearch-notifications-core:${notifications_version}@zip" opensearchPlugin "org.opensearch.plugin:notifications:${notifications_version}@zip" opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip" + opensearchPlugin "org.opensearch.plugin:opensearch-cross-cluster-replication:${ccr_version}@zip" } repositories { @@ -313,6 +319,7 @@ def jobSchedulerFile = resolvePluginFile("opensearch-job-scheduler") def notificationsCoreFile = resolvePluginFile("opensearch-notifications-core") def notificationsFile = resolvePluginFile("notifications") def securityPluginFile = resolvePluginFile("opensearch-security") +def ccrFile = resolvePluginFile("opensearch-security") ext.getPluginResource = { download_to_folder, download_from_src -> def src_split = download_from_src.split("/") @@ -393,6 +400,7 @@ testClusters.integTest { if (securityEnabled) { plugin(provider(securityPluginFile)) } + plugin(provider(ccrFile)) setting 'path.repo', repo.absolutePath } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt index a45cfddae..e3d5a6eb0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser +import org.opensearch.indexmanagement.indexstatemanagement.action.StopReplicationActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser @@ -51,6 +52,7 @@ class ISMActionsParser private constructor() { RolloverActionParser(), ShrinkActionParser(), SnapshotActionParser(), + StopReplicationActionParser(), TransformActionParser(), ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationAction.kt new file mode 100644 index 000000000..9359b0bc3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationAction.kt @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication.AttemptStopReplicationStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext + +/** + * ISM action to stop replication on indices replicated on a follower cluster. + */ +class StopReplicationAction( + index: Int, +) : Action(name, index) { + companion object { + const val name = "stop_replication" + } + + private val attemptStopReplicationStep = AttemptStopReplicationStep() + + private val steps = listOf(attemptStopReplicationStep) + + override fun getStepToExecute(context: StepContext): Step = attemptStopReplicationStep + + override fun getSteps(): List = steps +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionParser.kt new file mode 100644 index 000000000..15c8ed8ed --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionParser.kt @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser + +class StopReplicationActionParser : ActionParser() { + override fun fromStreamInput(sin: StreamInput): Action { + val index = sin.readInt() + return StopReplicationAction(index) + } + + override fun fromXContent(xcp: XContentParser, index: Int): Action { + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) + + return StopReplicationAction(index) + } + + override fun getActionType(): String = StopReplicationAction.name +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/stopreplication/AttemptStopReplicationStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/stopreplication/AttemptStopReplicationStep.kt new file mode 100644 index 000000000..0ad5053d3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/stopreplication/AttemptStopReplicationStep.kt @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication + +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.commons.replication.ReplicationPluginInterface +import org.opensearch.commons.replication.action.StopIndexReplicationRequest +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.snapshots.SnapshotInProgressException +import org.opensearch.transport.RemoteTransportException + +class AttemptStopReplicationStep : Step(name) { + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + private var replicationPluginInterface: ReplicationPluginInterface = ReplicationPluginInterface() + fun setReplicationPluginInterface(replicationPluginInterface: ReplicationPluginInterface) { + this.replicationPluginInterface = replicationPluginInterface + } + + override suspend fun execute(): Step { + val context = this.context ?: return this + val indexName = context.metadata.index + try { + val stopIndexReplicationRequestObj = StopIndexReplicationRequest(indexName) + val response: AcknowledgedResponse = context.client.admin().indices().suspendUntil { + replicationPluginInterface.stopReplication( + context.client, + stopIndexReplicationRequestObj, + it, + ) + } + if (response.isAcknowledged) { + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getSuccessMessage(indexName)) + } else { + val message = getFailedMessage(indexName) + logger.warn(message) + stepStatus = StepStatus.FAILED + info = mapOf("message" to message) + } + } catch (e: RemoteTransportException) { + val cause = ExceptionsHelper.unwrapCause(e) + if (cause is SnapshotInProgressException) { + handleSnapshotException(indexName, cause) + } else { + handleException(indexName, cause as Exception) + } + } catch (e: SnapshotInProgressException) { + handleSnapshotException(indexName, e) + } catch (e: Exception) { + handleException(indexName, e) + } + return this + } + + private fun handleSnapshotException(indexName: String, e: SnapshotInProgressException) { + val message = getSnapshotMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + info = mapOf("message" to message) + } + + private fun handleException(indexName: String, e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData = currentMetadata.copy( + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + transitionTo = null, + info = info, + ) + + override fun isIdempotent() = false + + companion object { + const val name = "attempt_stop_replication" + + fun getFailedMessage(index: String) = "Failed to stop replication [index=$index]" + + fun getSuccessMessage(index: String) = "Successfully stopped replication [index=$index]" + + fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying stop replication [index=$index]" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt index 4a0a1fa97..456562119 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt @@ -33,6 +33,7 @@ class ActionValidation( "transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName) "close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName) "index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName) + "stop_replication" -> ValidateStopReplication(settings, clusterService, jvmService).execute(indexName) // No validations for these actions at current stage. // Reason: https://github.com/opensearch-project/index-management/issues/587 "notification" -> ValidateNothing(settings, clusterService, jvmService).execute(indexName) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateStopReplication.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateStopReplication.kt new file mode 100644 index 000000000..b0ef52d10 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateStopReplication.kt @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.validation + +import org.apache.logging.log4j.LogManager +import org.opensearch.cluster.metadata.MetadataCreateIndexService +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate +import org.opensearch.indexmanagement.util.OpenForTesting +import org.opensearch.indices.InvalidIndexNameException +import org.opensearch.monitor.jvm.JvmService + +@OpenForTesting +class ValidateStopReplication( + settings: Settings, + clusterService: ClusterService, + jvmService: JvmService, +) : Validate(settings, clusterService, jvmService) { + private val logger = LogManager.getLogger(javaClass) + + @Suppress("ReturnSuppressCount", "ReturnCount") + override fun execute(indexName: String): Validate { + // if these conditions are false, fail validation and do not execute stop_replication action + if (!indexExists(indexName) || !validIndex(indexName)) { + validationStatus = ValidationStatus.FAILED + return this + } + validationMessage = getValidationPassedMessage(indexName) + return this + } + + private fun indexExists(indexName: String): Boolean { + val isIndexExists = clusterService.state().metadata.indices.containsKey(indexName) + if (!isIndexExists) { + val message = getNoIndexMessage(indexName) + logger.warn(message) + validationMessage = message + return false + } + return true + } + + private fun validIndex(indexName: String): Boolean { + val exceptionGenerator: (String, String) -> RuntimeException = { index_name, reason -> InvalidIndexNameException(index_name, reason) } + // If the index name is invalid for any reason, this will throw an exception giving the reason why in the message. + // That will be displayed to the user as the cause. + try { + MetadataCreateIndexService.validateIndexOrAliasName(indexName, exceptionGenerator) + } catch (e: Exception) { + val message = getIndexNotValidMessage(indexName) + logger.warn(message) + validationMessage = message + return false + } + return true + } + + @Suppress("TooManyFunctions") + companion object { + const val name = "validate_stop_replication" + + fun getNoIndexMessage(index: String) = "No such index [index=$index] for stop replication action." + + fun getIndexNotValidMessage(index: String) = "Index [index=$index] is not valid. Abort stop replication action on it." + + fun getValidationPassedMessage(index: String) = "Stop replication action validation passed for [index=$index]" + } +} diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 4c138a267..273e0a02c 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 21 + "schema_version": 22 }, "dynamic": "strict", "properties": { @@ -170,6 +170,9 @@ "delete": { "type": "object" }, + "stop_replication": { + "type": "object" + }, "force_merge": { "properties": { "max_num_segments": { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 2f8ef3e2e..5cd1dbf43 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -43,7 +43,7 @@ import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 21 + val configSchemaVersion = 22 val historySchemaVersion = 7 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionIT.kt new file mode 100644 index 000000000..5861363d5 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionIT.kt @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase +import org.opensearch.indexmanagement.indexstatemanagement.model.Policy +import org.opensearch.indexmanagement.indexstatemanagement.model.State +import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification +import org.opensearch.indexmanagement.waitFor +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Locale + +class StopReplicationActionIT : IndexStateManagementRestTestCase() { + private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) + + fun `test stop_replication on a non-replicated index`() { + val indexName = "${testIndexName}_index_1" + val policyID = "${testIndexName}_testPolicyName_1" + val actionConfig = StopReplicationAction(0) + val states = + listOf( + State("StopReplicationState", listOf(actionConfig), listOf()), + ) + + val policy = + Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states, + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID) + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Need to wait two cycles. + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + val metadataInfo = getExplainManagedIndexMetaData(indexName).info.toString() + assertTrue( + metadataInfo.contains("cause=No replication in progress for index:" + indexName), + ) + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptStopReplicationStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptStopReplicationStepTests.kt new file mode 100644 index 000000000..8294eba7b --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptStopReplicationStepTests.kt @@ -0,0 +1,199 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step + +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.AdminClient +import org.opensearch.client.Client +import org.opensearch.client.IndicesAdminClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.commons.replication.ReplicationPluginInterface +import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication.AttemptStopReplicationStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.jobscheduler.spi.utils.LockService +import org.opensearch.script.ScriptService +import org.opensearch.test.OpenSearchTestCase + +class AttemptStopReplicationStepTests : OpenSearchTestCase() { + private val clusterService: ClusterService = mock() + private val scriptService: ScriptService = mock() + private val settings: Settings = Settings.EMPTY + private val lockService: LockService = LockService(mock(), clusterService) + + fun `test stop replication step sets step status to completed when successful`() { + val stopReplicationResponse = AcknowledgedResponse(true) + val indicesAdminClient: IndicesAdminClient = mock() + val client = getClient(getAdminClient(indicesAdminClient)) + + runBlocking { + val replicationPluginInterface: ReplicationPluginInterface = mock() + whenever(replicationPluginInterface.stopReplication(any(), any(), any())) + .thenAnswer { invocation -> + val listener = invocation.getArgument>(2) + listener.onResponse(stopReplicationResponse) // Simulate a successful response + } + val managedIndexMetaData = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + ) + val context = StepContext( + managedIndexMetaData, + clusterService, + client, + null, + null, + scriptService, + settings, + lockService, + ) + val attemptStopReplicationStep = AttemptStopReplicationStep() + attemptStopReplicationStep.setReplicationPluginInterface(replicationPluginInterface) + attemptStopReplicationStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = attemptStopReplicationStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + assertEquals( + "Step status is not COMPLETED", + Step.StepStatus.COMPLETED, + updatedManagedIndexMetaData.stepMetaData?.stepStatus, + ) + } + } + + fun `test stop replication step sets step status to failed when not acknowledged`() { + val stopReplicationResponse = AcknowledgedResponse(false) + val indicesAdminClient: IndicesAdminClient = mock() + val client = getClient(getAdminClient(indicesAdminClient)) + println("Client class: " + client::class.java.name) + + runBlocking { + val replicationPluginInterface: ReplicationPluginInterface = mock() + whenever(replicationPluginInterface.stopReplication(any(), any(), any())) + .thenAnswer { invocation -> + val listener = invocation.getArgument>(2) + listener.onResponse(stopReplicationResponse) // Simulate a successful response + } + val managedIndexMetaData = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + ) + val context = StepContext( + managedIndexMetaData, + clusterService, + client, + null, + null, + scriptService, + settings, + lockService, + ) + val attemptStopReplicationStep = AttemptStopReplicationStep() + attemptStopReplicationStep.setReplicationPluginInterface(replicationPluginInterface) + attemptStopReplicationStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = attemptStopReplicationStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + assertEquals( + "Step status is not FAILED", + Step.StepStatus.FAILED, + updatedManagedIndexMetaData.stepMetaData?.stepStatus, + ) + } + } + + fun `test stop replication step sets step status to failed when error thrown`() { + val indicesAdminClient: IndicesAdminClient = mock() + val client = getClient(getAdminClient(indicesAdminClient)) + val exception = Exception("Test exception") + + runBlocking { + val replicationPluginInterface: ReplicationPluginInterface = mock() + whenever(replicationPluginInterface.stopReplication(any(), any(), any())) + .thenAnswer { invocation -> + val listener = invocation.getArgument>(2) + listener.onFailure(exception) // Simulate a successful response + } + + val managedIndexMetaData = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + ) + val context = StepContext( + managedIndexMetaData, + clusterService, + client, + null, + null, + scriptService, + settings, + lockService, + ) + val attemptStopReplicationStep = AttemptStopReplicationStep() + attemptStopReplicationStep.setReplicationPluginInterface(replicationPluginInterface) + attemptStopReplicationStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = attemptStopReplicationStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + println("Step status for 3rd test: " + updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals( + "Step status is not FAILED", + Step.StepStatus.FAILED, + updatedManagedIndexMetaData.stepMetaData?.stepStatus, + ) + println("Step status for 3rd test: " + updatedManagedIndexMetaData.stepMetaData?.toString()) + } + } + + private fun getClient(adminClient: AdminClient): Client { + val mockClient = mock() + whenever(mockClient.admin()).thenReturn(adminClient) + return mockClient + } + + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient { + val mockAdminClient = mock() + whenever(mockAdminClient.indices()).thenReturn(indicesAdminClient) + return mockAdminClient + } +} diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 4c138a267..273e0a02c 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 21 + "schema_version": 22 }, "dynamic": "strict", "properties": { @@ -170,6 +170,9 @@ "delete": { "type": "object" }, + "stop_replication": { + "type": "object" + }, "force_merge": { "properties": { "max_num_segments": {