diff --git a/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt new file mode 100644 index 00000000..e059f404 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.commons.replication + +import org.opensearch.action.support.clustermanager.AcknowledgedResponse +import org.opensearch.transport.client.Client +import org.opensearch.transport.client.node.NodeClient +import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_TYPE +import org.opensearch.commons.replication.action.StopIndexReplicationRequest +import org.opensearch.commons.utils.recreateObject +import org.opensearch.core.action.ActionListener +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.common.io.stream.Writeable + +/** + * Transport action plugin interfaces for the cross-cluster-replication plugin. + */ +open class ReplicationPluginInterface { + + /** + * Stop replication. + * @param client Node client for making transport action + * @param request The request object + * @param listener The listener for getting response + */ + + open fun stopReplication( + client: Client, + request: StopIndexReplicationRequest, + listener: ActionListener + ) { + val nodeClient = client as NodeClient + return nodeClient.execute( + INTERNAL_STOP_REPLICATION_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + AcknowledgedResponse(it) + } + } + ) + } + + /** + * Wrap action listener on concrete response class by a new created one on ActionResponse. + * This is required because the response may be loaded by different classloader across plugins. + * The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate + * the response object. + */ + @Suppress("UNCHECKED_CAST") + private fun wrapActionListener( + listener: ActionListener, + recreate: (Writeable) -> Response + ): ActionListener { + return object : ActionListener { + override fun onResponse(response: ActionResponse) { + val recreated = response as? Response ?: recreate(response) + listener.onResponse(recreated) + } + + override fun onFailure(exception: java.lang.Exception) { + listener.onFailure(exception) + } + } as ActionListener + } +} diff --git a/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt new file mode 100644 index 00000000..711fb217 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.commons.replication.action + +import org.opensearch.action.ActionType +import org.opensearch.action.support.clustermanager.AcknowledgedResponse + +/** + * Information related to the transport stop replication action for the Replication plugin + */ +object ReplicationActions { + + /** + * Action names for stopping replication + * STOP_REPLICATION_ACTION_NAME: action used for _replication/_stop REST API + * INTERNAL_STOP_REPLICATION_ACTION_NAME: Internal only - Used by Index Management plugin to invoke stop replication + */ + const val STOP_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/stop" + const val INTERNAL_STOP_REPLICATION_ACTION_NAME = "indices:internal/plugins/replication/index/stop" + + /** + * Stop replication transport action types. + */ + val STOP_REPLICATION_ACTION_TYPE = + ActionType(STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) + val INTERNAL_STOP_REPLICATION_ACTION_TYPE = + ActionType(INTERNAL_STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) +} diff --git a/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt b/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt new file mode 100644 index 00000000..e4940ea3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.commons.replication.action + +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.IndicesRequest +import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.clustermanager.AcknowledgedRequest +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ObjectParser +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser + +class StopIndexReplicationRequest : + AcknowledgedRequest, IndicesRequest.Replaceable, ToXContentObject { + lateinit var indexName: String + constructor(indexName: String) { + this.indexName = indexName + } + + private constructor() { + } + + constructor(inp: StreamInput) : super(inp) { + indexName = inp.readString() + } + companion object { + private val PARSER = ObjectParser("StopReplicationRequestParser") { + StopIndexReplicationRequest() + } + + fun fromXContent(parser: XContentParser, followerIndex: String): StopIndexReplicationRequest { + val stopIndexReplicationRequest = PARSER.parse(parser, null) + stopIndexReplicationRequest.indexName = followerIndex + return stopIndexReplicationRequest + } + } + + override fun validate(): ActionRequestValidationException? { + return null + } + + override fun indices(vararg indices: String?): IndicesRequest { + return this + } + override fun indices(): Array { + return arrayOf(indexName) + } + + override fun indicesOptions(): IndicesOptions { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed() + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + builder.field("indexName", indexName) + builder.endObject() + return builder + } + + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeString(indexName) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt new file mode 100644 index 00000000..e07c2bef --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.replication + +import com.nhaarman.mockitokotlin2.whenever +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Mockito.any +import org.mockito.Mockito.mock +import org.mockito.Mockito.verify +import org.mockito.junit.jupiter.MockitoExtension +import org.opensearch.action.support.clustermanager.AcknowledgedResponse +import org.opensearch.transport.client.node.NodeClient +import org.opensearch.commons.replication.action.StopIndexReplicationRequest +import org.opensearch.core.action.ActionListener +import org.opensearch.core.action.ActionResponse + +@ExtendWith(MockitoExtension::class) +internal class ReplicationPluginInterfaceTests { + + @Test + fun `test stopReplication successful response`() { + // Mock dependencies + val client: NodeClient = mock() + val request: StopIndexReplicationRequest = mock() + val listener: ActionListener = mock() + val acknowledgedResponse = AcknowledgedResponse(true) // Successful response + + // Mock the behavior of NodeClient.execute() + whenever(client.execute(any(), any(), any>())) + .thenAnswer { invocation -> + val actionListener = invocation.getArgument>(2) + actionListener.onResponse(acknowledgedResponse) // Simulate success + } + + val replicationPluginInterface = ReplicationPluginInterface() + // Call method under test + replicationPluginInterface.stopReplication(client, request, listener) + // Verify that listener.onResponse is called with the correct response + verify(listener).onResponse(acknowledgedResponse) + } + + @Test + fun `test stopReplication failure response`() { + // Mock dependencies + val client: NodeClient = mock() + val request: StopIndexReplicationRequest = mock() + val listener: ActionListener = mock() + val exception = Exception("Test failure") + + // Mock the behavior of NodeClient.execute() + whenever(client.execute(any(), any(), any>())) + .thenAnswer { invocation -> + val actionListener = invocation.getArgument>(2) + actionListener.onFailure(exception) // Simulate failure + } + + val replicationPluginInterface = ReplicationPluginInterface() + // Call method under test + replicationPluginInterface.stopReplication(client, request, listener) + // Verify that listener.onResponse is called with the correct response + verify(listener).onFailure(exception) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt b/src/test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt new file mode 100644 index 00000000..805b566d --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.commons.replication.action + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Test +import org.opensearch.commons.utils.recreateObject + +internal class StopIndexReplicationRequestTests { + @Test + fun `Stop Replication request serialize and deserialize transport object should be equal`() { + val index = "test-idx" + val request = StopIndexReplicationRequest(index) + val recreatedRequest = recreateObject(request) { StopIndexReplicationRequest(it) } + assertNotNull(recreatedRequest) + assertEquals(request.indexName, recreatedRequest.indexName) + assertNull(recreatedRequest.validate()) + } +}