From da6844457ea52f1a5dee59abb363b1b2044784f1 Mon Sep 17 00:00:00 2001 From: aggarwalShivani Date: Wed, 5 Jun 2024 00:09:37 +0530 Subject: [PATCH 1/5] Adding replication (CCR) plugin interface and classes Signed-off-by: aggarwalShivani --- .../replication/ReplicationPluginInterface.kt | 67 +++++++++++++++++++ .../replication/action/ReplicationActions.kt | 27 ++++++++ .../action/StopIndexReplicationRequest.kt | 62 +++++++++++++++++ .../ReplicationPluginInterfaceTests.kt | 43 ++++++++++++ .../StopIndexReplicationRequestTests.kt | 23 +++++++ 5 files changed, 222 insertions(+) create mode 100755 src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt create mode 100755 src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt create mode 100755 src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt create mode 100755 src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt create mode 100755 src/test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt diff --git a/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt new file mode 100755 index 00000000..74c00744 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.commons.replication + +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.replication.action.StopIndexReplicationRequest +import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE +import org.opensearch.commons.utils.recreateObject +import org.opensearch.core.action.ActionListener +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.common.io.stream.Writeable + + +/** + * All the transport action plugin interfaces for the cross-cluster-replication plugin. + */ +object ReplicationPluginInterface { + + /** + * Stop replication. + * @param client Node client for making transport action + * @param request The request object + * @param listener The listener for getting response + */ + + fun stopReplication( + client: NodeClient, + request: StopIndexReplicationRequest, + listener: ActionListener + ) { + return client.execute( + STOP_REPLICATION_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + AcknowledgedResponse(it) + } + } + ) + } + + /** + * Wrap action listener on concrete response class by a new created one on ActionResponse. + * This is required because the response may be loaded by different classloader across plugins. + * The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate + * the response object. + */ + @Suppress("UNCHECKED_CAST") + private fun wrapActionListener( + listener: ActionListener, + recreate: (Writeable) -> Response + ): ActionListener { + return object : ActionListener { + override fun onResponse(response: ActionResponse) { + val recreated = response as? Response ?: recreate(response) + listener.onResponse(recreated) + } + + override fun onFailure(exception: java.lang.Exception) { + listener.onFailure(exception) + } + } as ActionListener + } +} diff --git a/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt new file mode 100755 index 00000000..69f49332 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.commons.replication.action + +import org.opensearch.action.ActionType +import org.opensearch.action.support.master.AcknowledgedResponse + +/** + * All the transport action information for the Replication plugin + */ +object ReplicationActions { + + /** + * Stop replication. Internal only - Inter plugin communication. + */ + const val STOP_REPLICATION_NAME = "indices:admin/plugins/replication/index/stop" + const val STOP_REPLICATION_BASE_ACTION_NAME = "indices:admin/plugins/replication/index/unfollow" + + /** + * Stop replication transport action type. Internal only - Inter plugin communication. + */ + val STOP_REPLICATION_ACTION_TYPE = + ActionType(STOP_REPLICATION_NAME, ::AcknowledgedResponse) + +} diff --git a/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt b/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt new file mode 100755 index 00000000..188edeb5 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt @@ -0,0 +1,62 @@ +package org.opensearch.commons.replication.action + +import org.opensearch.action.ActionRequestValidationException +import org.apache.logging.log4j.LogManager +import org.opensearch.action.IndicesRequest +import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.master.AcknowledgedRequest +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.* +class StopIndexReplicationRequest : AcknowledgedRequest, IndicesRequest.Replaceable, ToXContentObject { + lateinit var indexName: String + constructor(indexName: String) { + this.indexName = indexName + } + + private constructor() { + } + + constructor(inp: StreamInput): super(inp) { + indexName = inp.readString() + } + companion object { + private val PARSER = ObjectParser("StopReplicationRequestParser") { + StopIndexReplicationRequest() + } + + fun fromXContent(parser: XContentParser, followerIndex: String): StopIndexReplicationRequest { + val stopIndexReplicationRequest = PARSER.parse(parser, null) + stopIndexReplicationRequest.indexName = followerIndex + return stopIndexReplicationRequest + } + private val log = LogManager.getLogger(StopIndexReplicationRequest::class.java) + } + + override fun validate(): ActionRequestValidationException? { + return null + } + + override fun indices(vararg indices: String?): IndicesRequest { + return this + } + override fun indices(): Array { + return arrayOf(indexName) + } + + override fun indicesOptions(): IndicesOptions { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed() + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + builder.field("indexName", indexName) + builder.endObject() + return builder + } + + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeString(indexName) + } +} \ No newline at end of file diff --git a/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt new file mode 100755 index 00000000..c47843a9 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.replication + +import com.nhaarman.mockitokotlin2.whenever +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Answers +import org.mockito.ArgumentMatchers +import org.mockito.Mock +import org.mockito.Mockito +import org.mockito.junit.jupiter.MockitoExtension +import org.opensearch.action.ActionType +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.replication.action.StopIndexReplicationRequest +import org.opensearch.core.action.ActionListener + +@Suppress("UNCHECKED_CAST") +@ExtendWith(MockitoExtension::class) +internal class ReplicationPluginInterfaceTests { + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private lateinit var client: NodeClient + @Test + fun stopReplication() { + val request = Mockito.mock(StopIndexReplicationRequest::class.java) + val response = AcknowledgedResponse(true) + val listener: ActionListener = + Mockito.mock(ActionListener::class.java) as ActionListener + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + ReplicationPluginInterface.stopReplication(client, request, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } +} \ No newline at end of file diff --git a/src/test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt b/src/test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt new file mode 100755 index 00000000..e64cb976 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.commons.replication.action + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Test +import org.opensearch.commons.utils.recreateObject + +internal class StopIndexReplicationRequestTests { + @Test + fun `Stop Replication request serialize and deserialize transport object should be equal`() { + val index = "test-idx" + val request = StopIndexReplicationRequest(index) + val recreatedRequest = recreateObject(request) { StopIndexReplicationRequest(it) } + assertNotNull(recreatedRequest) + assertEquals(request.indexName, recreatedRequest.indexName) + assertNull(recreatedRequest.validate()) + } +} \ No newline at end of file From a3900cd250abdbfad6eafdbd20df98167f681b64 Mon Sep 17 00:00:00 2001 From: aggarwalShivani Date: Thu, 27 Jun 2024 11:38:31 +0530 Subject: [PATCH 2/5] Adding new actiontype for unfollow replication through ism plugin Signed-off-by: aggarwalShivani --- .../replication/ReplicationPluginInterface.kt | 6 +++--- .../replication/action/ReplicationActions.kt | 17 +++++++++++------ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt index 74c00744..51812179 100755 --- a/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt @@ -7,7 +7,7 @@ package org.opensearch.commons.replication import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.node.NodeClient import org.opensearch.commons.replication.action.StopIndexReplicationRequest -import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE +import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_TYPE import org.opensearch.commons.utils.recreateObject import org.opensearch.core.action.ActionListener import org.opensearch.core.action.ActionResponse @@ -15,7 +15,7 @@ import org.opensearch.core.common.io.stream.Writeable /** - * All the transport action plugin interfaces for the cross-cluster-replication plugin. + * Transport action plugin interfaces for the cross-cluster-replication plugin. */ object ReplicationPluginInterface { @@ -32,7 +32,7 @@ object ReplicationPluginInterface { listener: ActionListener ) { return client.execute( - STOP_REPLICATION_ACTION_TYPE, + UNFOLLOW_REPLICATION_ACTION_TYPE, request, wrapActionListener(listener) { response -> recreateObject(response) { diff --git a/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt index 69f49332..46a76058 100755 --- a/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt +++ b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt @@ -8,20 +8,25 @@ import org.opensearch.action.ActionType import org.opensearch.action.support.master.AcknowledgedResponse /** - * All the transport action information for the Replication plugin + * Information related to the transport stop replication action for the Replication plugin */ object ReplicationActions { /** - * Stop replication. Internal only - Inter plugin communication. + * Action names for stopping replication + * STOP_REPLICATION_ACTION_NAME: action used for _stop REST API + * UNFOLLOW_REPLICATION_ACTION_NAME: internal action used for inter-plugin communication i.e. by ism to invoke stop + * replication. */ - const val STOP_REPLICATION_NAME = "indices:admin/plugins/replication/index/stop" - const val STOP_REPLICATION_BASE_ACTION_NAME = "indices:admin/plugins/replication/index/unfollow" + const val STOP_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/stop" + const val UNFOLLOW_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/unfollow" /** - * Stop replication transport action type. Internal only - Inter plugin communication. + * Stop replication transport action types. */ val STOP_REPLICATION_ACTION_TYPE = - ActionType(STOP_REPLICATION_NAME, ::AcknowledgedResponse) + ActionType(STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) + val UNFOLLOW_REPLICATION_ACTION_TYPE = + ActionType(UNFOLLOW_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) } From 72a4fdf5e6004037633a7342df81f01ff3e52115 Mon Sep 17 00:00:00 2001 From: aggarwalShivani Date: Mon, 8 Jul 2024 16:34:32 +0530 Subject: [PATCH 3/5] Fix ktlint issues for replication libs Signed-off-by: aggarwalShivani --- .../replication/ReplicationPluginInterface.kt | 3 +-- .../replication/action/ReplicationActions.kt | 1 - .../action/StopIndexReplicationRequest.kt | 16 ++++++++++------ .../ReplicationPluginInterfaceTests.kt | 3 ++- .../action/StopIndexReplicationRequestTests.kt | 2 +- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt index 51812179..be42c9d3 100755 --- a/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt @@ -6,14 +6,13 @@ package org.opensearch.commons.replication import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.node.NodeClient -import org.opensearch.commons.replication.action.StopIndexReplicationRequest import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_TYPE +import org.opensearch.commons.replication.action.StopIndexReplicationRequest import org.opensearch.commons.utils.recreateObject import org.opensearch.core.action.ActionListener import org.opensearch.core.action.ActionResponse import org.opensearch.core.common.io.stream.Writeable - /** * Transport action plugin interfaces for the cross-cluster-replication plugin. */ diff --git a/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt index 46a76058..0de1788c 100755 --- a/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt +++ b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt @@ -28,5 +28,4 @@ object ReplicationActions { ActionType(STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) val UNFOLLOW_REPLICATION_ACTION_TYPE = ActionType(UNFOLLOW_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) - } diff --git a/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt b/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt index 188edeb5..25ceb055 100755 --- a/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt @@ -1,14 +1,19 @@ package org.opensearch.commons.replication.action import org.opensearch.action.ActionRequestValidationException -import org.apache.logging.log4j.LogManager import org.opensearch.action.IndicesRequest import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedRequest import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.core.xcontent.* -class StopIndexReplicationRequest : AcknowledgedRequest, IndicesRequest.Replaceable, ToXContentObject { +import org.opensearch.core.xcontent.ObjectParser +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser + +class StopIndexReplicationRequest : + AcknowledgedRequest, IndicesRequest.Replaceable, ToXContentObject { lateinit var indexName: String constructor(indexName: String) { this.indexName = indexName @@ -17,7 +22,7 @@ class StopIndexReplicationRequest : AcknowledgedRequest Date: Wed, 29 Jan 2025 17:50:33 +0530 Subject: [PATCH 4/5] Changes for stop-replication action Signed-off-by: aggarwalShivani --- .../replication/ReplicationPluginInterface.kt | 14 ++-- .../replication/action/ReplicationActions.kt | 11 ++-- .../action/StopIndexReplicationRequest.kt | 4 ++ .../ReplicationPluginInterfaceTests.kt | 65 +++++++++++++------ .../StopIndexReplicationRequestTests.kt | 0 5 files changed, 61 insertions(+), 33 deletions(-) mode change 100755 => 100644 src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt mode change 100755 => 100644 src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt mode change 100755 => 100644 src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt mode change 100755 => 100644 src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt mode change 100755 => 100644 src/test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt diff --git a/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt old mode 100755 new mode 100644 index be42c9d3..241210a0 --- a/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt @@ -5,8 +5,9 @@ package org.opensearch.commons.replication import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.Client import org.opensearch.client.node.NodeClient -import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_TYPE +import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_TYPE import org.opensearch.commons.replication.action.StopIndexReplicationRequest import org.opensearch.commons.utils.recreateObject import org.opensearch.core.action.ActionListener @@ -16,7 +17,7 @@ import org.opensearch.core.common.io.stream.Writeable /** * Transport action plugin interfaces for the cross-cluster-replication plugin. */ -object ReplicationPluginInterface { +open class ReplicationPluginInterface { /** * Stop replication. @@ -25,13 +26,14 @@ object ReplicationPluginInterface { * @param listener The listener for getting response */ - fun stopReplication( - client: NodeClient, + open fun stopReplication( + client: Client, request: StopIndexReplicationRequest, listener: ActionListener ) { - return client.execute( - UNFOLLOW_REPLICATION_ACTION_TYPE, + val nodeClient = client as NodeClient + return nodeClient.execute( + INTERNAL_STOP_REPLICATION_ACTION_TYPE, request, wrapActionListener(listener) { response -> recreateObject(response) { diff --git a/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt old mode 100755 new mode 100644 index 0de1788c..7aa27666 --- a/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt +++ b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt @@ -14,18 +14,17 @@ object ReplicationActions { /** * Action names for stopping replication - * STOP_REPLICATION_ACTION_NAME: action used for _stop REST API - * UNFOLLOW_REPLICATION_ACTION_NAME: internal action used for inter-plugin communication i.e. by ism to invoke stop - * replication. + * STOP_REPLICATION_ACTION_NAME: action used for _replication/_stop REST API + * INTERNAL_STOP_REPLICATION_ACTION_NAME: Internal only - Used by Index Management plugin to invoke stop replication */ const val STOP_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/stop" - const val UNFOLLOW_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/unfollow" + const val INTERNAL_STOP_REPLICATION_ACTION_NAME = "indices:internal/plugins/replication/index/stop" /** * Stop replication transport action types. */ val STOP_REPLICATION_ACTION_TYPE = ActionType(STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) - val UNFOLLOW_REPLICATION_ACTION_TYPE = - ActionType(UNFOLLOW_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) + val INTERNAL_STOP_REPLICATION_ACTION_TYPE = + ActionType(INTERNAL_STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) } diff --git a/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt b/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt old mode 100755 new mode 100644 index 25ceb055..a3e46467 --- a/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt @@ -1,3 +1,7 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ package org.opensearch.commons.replication.action import org.opensearch.action.ActionRequestValidationException diff --git a/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt old mode 100755 new mode 100644 index 47c20543..d6af86d0 --- a/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt @@ -8,37 +8,60 @@ package org.opensearch.commons.replication import com.nhaarman.mockitokotlin2.whenever import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith -import org.mockito.Answers -import org.mockito.ArgumentMatchers -import org.mockito.Mock -import org.mockito.Mockito +import org.mockito.Mockito.any +import org.mockito.Mockito.verify +import org.mockito.Mockito.mock import org.mockito.junit.jupiter.MockitoExtension -import org.opensearch.action.ActionType import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.node.NodeClient import org.opensearch.commons.replication.action.StopIndexReplicationRequest import org.opensearch.core.action.ActionListener +import org.opensearch.core.action.ActionResponse -@Suppress("UNCHECKED_CAST") @ExtendWith(MockitoExtension::class) internal class ReplicationPluginInterfaceTests { - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private lateinit var client: NodeClient + @Test + fun `test stopReplication successful response`() { + // Mock dependencies + val client: NodeClient = mock() + val request: StopIndexReplicationRequest = mock() + val listener: ActionListener = mock() + val acknowledgedResponse = AcknowledgedResponse(true) // Successful response + + // Mock the behavior of NodeClient.execute() + whenever(client.execute(any(), any(), any>())) + .thenAnswer { invocation -> + val actionListener = invocation.getArgument>(2) + actionListener.onResponse(acknowledgedResponse) // Simulate success + } + + val replicationPluginInterface = ReplicationPluginInterface() + // Call method under test + replicationPluginInterface.stopReplication(client, request, listener) + // Verify that listener.onResponse is called with the correct response + verify(listener).onResponse(acknowledgedResponse) + } @Test - fun stopReplication() { - val request = Mockito.mock(StopIndexReplicationRequest::class.java) - val response = AcknowledgedResponse(true) - val listener: ActionListener = - Mockito.mock(ActionListener::class.java) as ActionListener - - Mockito.doAnswer { - (it.getArgument(2) as ActionListener) - .onResponse(response) - }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) - - ReplicationPluginInterface.stopReplication(client, request, listener) - Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + fun `test stopReplication failure response`() { + // Mock dependencies + val client: NodeClient = mock() + val request: StopIndexReplicationRequest = mock() + val listener: ActionListener = mock() + val exception = Exception("Test failure") + + // Mock the behavior of NodeClient.execute() + whenever(client.execute(any(), any(), any>())) + .thenAnswer { invocation -> + val actionListener = invocation.getArgument>(2) + actionListener.onFailure(exception) // Simulate failure + } + + val replicationPluginInterface = ReplicationPluginInterface() + // Call method under test + replicationPluginInterface.stopReplication(client, request, listener) + // Verify that listener.onResponse is called with the correct response + verify(listener).onFailure(exception) } } diff --git a/src/test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt b/src/test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt old mode 100755 new mode 100644 From 833d9af70cd421d9205f3083357124da932f5af5 Mon Sep 17 00:00:00 2001 From: aggarwalShivani Date: Wed, 19 Feb 2025 20:10:41 +0530 Subject: [PATCH 5/5] Fixed imports for AcknowledgedResponse and org.opensearch.transport classes Signed-off-by: aggarwalShivani --- .../commons/replication/ReplicationPluginInterface.kt | 6 +++--- .../commons/replication/action/ReplicationActions.kt | 2 +- .../replication/action/StopIndexReplicationRequest.kt | 2 +- .../commons/replication/ReplicationPluginInterfaceTests.kt | 6 +++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt index 241210a0..e059f404 100644 --- a/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt @@ -4,9 +4,9 @@ */ package org.opensearch.commons.replication -import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.client.Client -import org.opensearch.client.node.NodeClient +import org.opensearch.action.support.clustermanager.AcknowledgedResponse +import org.opensearch.transport.client.Client +import org.opensearch.transport.client.node.NodeClient import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_TYPE import org.opensearch.commons.replication.action.StopIndexReplicationRequest import org.opensearch.commons.utils.recreateObject diff --git a/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt index 7aa27666..711fb217 100644 --- a/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt +++ b/src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt @@ -5,7 +5,7 @@ package org.opensearch.commons.replication.action import org.opensearch.action.ActionType -import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.action.support.clustermanager.AcknowledgedResponse /** * Information related to the transport stop replication action for the Replication plugin diff --git a/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt b/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt index a3e46467..e4940ea3 100644 --- a/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt @@ -7,7 +7,7 @@ package org.opensearch.commons.replication.action import org.opensearch.action.ActionRequestValidationException import org.opensearch.action.IndicesRequest import org.opensearch.action.support.IndicesOptions -import org.opensearch.action.support.master.AcknowledgedRequest +import org.opensearch.action.support.clustermanager.AcknowledgedRequest import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.xcontent.ObjectParser diff --git a/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt index d6af86d0..e07c2bef 100644 --- a/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt @@ -9,11 +9,11 @@ import com.nhaarman.mockitokotlin2.whenever import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith import org.mockito.Mockito.any -import org.mockito.Mockito.verify import org.mockito.Mockito.mock +import org.mockito.Mockito.verify import org.mockito.junit.jupiter.MockitoExtension -import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.client.node.NodeClient +import org.opensearch.action.support.clustermanager.AcknowledgedResponse +import org.opensearch.transport.client.node.NodeClient import org.opensearch.commons.replication.action.StopIndexReplicationRequest import org.opensearch.core.action.ActionListener import org.opensearch.core.action.ActionResponse