-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding replication (CCR) plugin interface and classes to common-utils #667
Open
aggarwalShivani
wants to merge
9
commits into
opensearch-project:main
Choose a base branch
from
nokia:ccr-libs
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+258
−0
Open
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
da68444
Adding replication (CCR) plugin interface and classes
aggarwalShivani 54c8310
Merge branch 'opensearch-project:main' into ccr-libs
aggarwalShivani a3900cd
Adding new actiontype for unfollow replication through ism plugin
aggarwalShivani e46f3b6
Merge branch 'opensearch-project:main' into ccr-libs
aggarwalShivani 72a4fdf
Fix ktlint issues for replication libs
aggarwalShivani 4d65546
Merge branch 'opensearch-project:main' into ccr-libs
aggarwalShivani 8d673ba
Changes for stop-replication action
aggarwalShivani d042fc1
Merge branch 'opensearch-project:main' into ccr-libs
aggarwalShivani 833d9af
Fixed imports for AcknowledgedResponse and org.opensearch.transport c…
aggarwalShivani File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
68 changes: 68 additions & 0 deletions
68
src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.commons.replication | ||
|
||
import org.opensearch.action.support.clustermanager.AcknowledgedResponse | ||
import org.opensearch.transport.client.Client | ||
import org.opensearch.transport.client.node.NodeClient | ||
import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_TYPE | ||
import org.opensearch.commons.replication.action.StopIndexReplicationRequest | ||
import org.opensearch.commons.utils.recreateObject | ||
import org.opensearch.core.action.ActionListener | ||
import org.opensearch.core.action.ActionResponse | ||
import org.opensearch.core.common.io.stream.Writeable | ||
|
||
/** | ||
* Transport action plugin interfaces for the cross-cluster-replication plugin. | ||
*/ | ||
open class ReplicationPluginInterface { | ||
|
||
/** | ||
* Stop replication. | ||
* @param client Node client for making transport action | ||
* @param request The request object | ||
* @param listener The listener for getting response | ||
*/ | ||
|
||
open fun stopReplication( | ||
client: Client, | ||
request: StopIndexReplicationRequest, | ||
listener: ActionListener<AcknowledgedResponse> | ||
) { | ||
val nodeClient = client as NodeClient | ||
return nodeClient.execute( | ||
INTERNAL_STOP_REPLICATION_ACTION_TYPE, | ||
request, | ||
wrapActionListener(listener) { response -> | ||
recreateObject(response) { | ||
AcknowledgedResponse(it) | ||
} | ||
} | ||
) | ||
} | ||
|
||
/** | ||
* Wrap action listener on concrete response class by a new created one on ActionResponse. | ||
* This is required because the response may be loaded by different classloader across plugins. | ||
* The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate | ||
* the response object. | ||
*/ | ||
@Suppress("UNCHECKED_CAST") | ||
private fun <Response : AcknowledgedResponse> wrapActionListener( | ||
bowenlan-amzn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
listener: ActionListener<Response>, | ||
recreate: (Writeable) -> Response | ||
): ActionListener<Response> { | ||
return object : ActionListener<ActionResponse> { | ||
override fun onResponse(response: ActionResponse) { | ||
val recreated = response as? Response ?: recreate(response) | ||
listener.onResponse(recreated) | ||
} | ||
|
||
override fun onFailure(exception: java.lang.Exception) { | ||
listener.onFailure(exception) | ||
} | ||
} as ActionListener<Response> | ||
} | ||
} |
30 changes: 30 additions & 0 deletions
30
src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.commons.replication.action | ||
|
||
import org.opensearch.action.ActionType | ||
import org.opensearch.action.support.clustermanager.AcknowledgedResponse | ||
|
||
/** | ||
* Information related to the transport stop replication action for the Replication plugin | ||
*/ | ||
object ReplicationActions { | ||
|
||
/** | ||
* Action names for stopping replication | ||
* STOP_REPLICATION_ACTION_NAME: action used for _replication/_stop REST API | ||
* INTERNAL_STOP_REPLICATION_ACTION_NAME: Internal only - Used by Index Management plugin to invoke stop replication | ||
*/ | ||
const val STOP_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/stop" | ||
const val INTERNAL_STOP_REPLICATION_ACTION_NAME = "indices:internal/plugins/replication/index/stop" | ||
|
||
/** | ||
* Stop replication transport action types. | ||
*/ | ||
val STOP_REPLICATION_ACTION_TYPE = | ||
bowenlan-amzn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ActionType(STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) | ||
val INTERNAL_STOP_REPLICATION_ACTION_TYPE = | ||
ActionType(INTERNAL_STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) | ||
} |
70 changes: 70 additions & 0 deletions
70
src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.commons.replication.action | ||
|
||
import org.opensearch.action.ActionRequestValidationException | ||
import org.opensearch.action.IndicesRequest | ||
import org.opensearch.action.support.IndicesOptions | ||
import org.opensearch.action.support.clustermanager.AcknowledgedRequest | ||
import org.opensearch.core.common.io.stream.StreamInput | ||
import org.opensearch.core.common.io.stream.StreamOutput | ||
import org.opensearch.core.xcontent.ObjectParser | ||
import org.opensearch.core.xcontent.ToXContent | ||
import org.opensearch.core.xcontent.ToXContentObject | ||
import org.opensearch.core.xcontent.XContentBuilder | ||
import org.opensearch.core.xcontent.XContentParser | ||
|
||
class StopIndexReplicationRequest : | ||
AcknowledgedRequest<StopIndexReplicationRequest>, IndicesRequest.Replaceable, ToXContentObject { | ||
lateinit var indexName: String | ||
constructor(indexName: String) { | ||
this.indexName = indexName | ||
} | ||
|
||
private constructor() { | ||
} | ||
|
||
constructor(inp: StreamInput) : super(inp) { | ||
indexName = inp.readString() | ||
} | ||
companion object { | ||
private val PARSER = ObjectParser<StopIndexReplicationRequest, Void>("StopReplicationRequestParser") { | ||
StopIndexReplicationRequest() | ||
} | ||
|
||
fun fromXContent(parser: XContentParser, followerIndex: String): StopIndexReplicationRequest { | ||
val stopIndexReplicationRequest = PARSER.parse(parser, null) | ||
stopIndexReplicationRequest.indexName = followerIndex | ||
return stopIndexReplicationRequest | ||
} | ||
} | ||
|
||
override fun validate(): ActionRequestValidationException? { | ||
return null | ||
} | ||
|
||
override fun indices(vararg indices: String?): IndicesRequest { | ||
return this | ||
} | ||
override fun indices(): Array<String> { | ||
return arrayOf(indexName) | ||
} | ||
|
||
override fun indicesOptions(): IndicesOptions { | ||
return IndicesOptions.strictSingleIndexNoExpandForbidClosed() | ||
} | ||
|
||
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { | ||
builder.startObject() | ||
builder.field("indexName", indexName) | ||
builder.endObject() | ||
return builder | ||
} | ||
|
||
override fun writeTo(out: StreamOutput) { | ||
super.writeTo(out) | ||
out.writeString(indexName) | ||
} | ||
} |
67 changes: 67 additions & 0 deletions
67
src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.commons.replication | ||
|
||
import com.nhaarman.mockitokotlin2.whenever | ||
import org.junit.jupiter.api.Test | ||
import org.junit.jupiter.api.extension.ExtendWith | ||
import org.mockito.Mockito.any | ||
import org.mockito.Mockito.mock | ||
import org.mockito.Mockito.verify | ||
import org.mockito.junit.jupiter.MockitoExtension | ||
import org.opensearch.action.support.clustermanager.AcknowledgedResponse | ||
import org.opensearch.transport.client.node.NodeClient | ||
import org.opensearch.commons.replication.action.StopIndexReplicationRequest | ||
import org.opensearch.core.action.ActionListener | ||
import org.opensearch.core.action.ActionResponse | ||
|
||
@ExtendWith(MockitoExtension::class) | ||
internal class ReplicationPluginInterfaceTests { | ||
|
||
@Test | ||
fun `test stopReplication successful response`() { | ||
// Mock dependencies | ||
val client: NodeClient = mock() | ||
val request: StopIndexReplicationRequest = mock() | ||
val listener: ActionListener<AcknowledgedResponse> = mock() | ||
val acknowledgedResponse = AcknowledgedResponse(true) // Successful response | ||
|
||
// Mock the behavior of NodeClient.execute() | ||
whenever(client.execute(any(), any(), any<ActionListener<ActionResponse>>())) | ||
.thenAnswer { invocation -> | ||
val actionListener = invocation.getArgument<ActionListener<ActionResponse>>(2) | ||
actionListener.onResponse(acknowledgedResponse) // Simulate success | ||
} | ||
|
||
val replicationPluginInterface = ReplicationPluginInterface() | ||
// Call method under test | ||
replicationPluginInterface.stopReplication(client, request, listener) | ||
// Verify that listener.onResponse is called with the correct response | ||
verify(listener).onResponse(acknowledgedResponse) | ||
} | ||
|
||
@Test | ||
fun `test stopReplication failure response`() { | ||
// Mock dependencies | ||
val client: NodeClient = mock() | ||
val request: StopIndexReplicationRequest = mock() | ||
val listener: ActionListener<AcknowledgedResponse> = mock() | ||
val exception = Exception("Test failure") | ||
|
||
// Mock the behavior of NodeClient.execute() | ||
whenever(client.execute(any(), any(), any<ActionListener<ActionResponse>>())) | ||
.thenAnswer { invocation -> | ||
val actionListener = invocation.getArgument<ActionListener<ActionResponse>>(2) | ||
actionListener.onFailure(exception) // Simulate failure | ||
} | ||
|
||
val replicationPluginInterface = ReplicationPluginInterface() | ||
// Call method under test | ||
replicationPluginInterface.stopReplication(client, request, listener) | ||
// Verify that listener.onResponse is called with the correct response | ||
verify(listener).onFailure(exception) | ||
} | ||
} |
23 changes: 23 additions & 0 deletions
23
...test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.commons.replication.action | ||
|
||
import org.junit.jupiter.api.Assertions.assertEquals | ||
import org.junit.jupiter.api.Assertions.assertNotNull | ||
import org.junit.jupiter.api.Assertions.assertNull | ||
import org.junit.jupiter.api.Test | ||
import org.opensearch.commons.utils.recreateObject | ||
|
||
internal class StopIndexReplicationRequestTests { | ||
@Test | ||
fun `Stop Replication request serialize and deserialize transport object should be equal`() { | ||
val index = "test-idx" | ||
val request = StopIndexReplicationRequest(index) | ||
val recreatedRequest = recreateObject(request) { StopIndexReplicationRequest(it) } | ||
assertNotNull(recreatedRequest) | ||
assertEquals(request.indexName, recreatedRequest.indexName) | ||
assertNull(recreatedRequest.validate()) | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like the methods here should just be static methods.