Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding replication (CCR) plugin interface and classes to common-utils #667

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication

import org.opensearch.action.support.clustermanager.AcknowledgedResponse
import org.opensearch.transport.client.Client
import org.opensearch.transport.client.node.NodeClient
import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_TYPE
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.Writeable

/**
* Transport action plugin interfaces for the cross-cluster-replication plugin.
*/
open class ReplicationPluginInterface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
open class ReplicationPluginInterface {
object ReplicationPluginInterface {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like the methods here should just be static methods.


/**
* Stop replication.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/

open fun stopReplication(
client: Client,
request: StopIndexReplicationRequest,
listener: ActionListener<AcknowledgedResponse>
) {
val nodeClient = client as NodeClient
return nodeClient.execute(
INTERNAL_STOP_REPLICATION_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
AcknowledgedResponse(it)
}
}
)
}

/**
* Wrap action listener on concrete response class by a new created one on ActionResponse.
* This is required because the response may be loaded by different classloader across plugins.
* The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate
* the response object.
*/
@Suppress("UNCHECKED_CAST")
private fun <Response : AcknowledgedResponse> wrapActionListener(
listener: ActionListener<Response>,
recreate: (Writeable) -> Response
): ActionListener<Response> {
return object : ActionListener<ActionResponse> {
override fun onResponse(response: ActionResponse) {
val recreated = response as? Response ?: recreate(response)
listener.onResponse(recreated)
}

override fun onFailure(exception: java.lang.Exception) {
listener.onFailure(exception)
}
} as ActionListener<Response>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.opensearch.action.ActionType
import org.opensearch.action.support.clustermanager.AcknowledgedResponse

/**
* Information related to the transport stop replication action for the Replication plugin
*/
object ReplicationActions {

/**
* Action names for stopping replication
* STOP_REPLICATION_ACTION_NAME: action used for _replication/_stop REST API
* INTERNAL_STOP_REPLICATION_ACTION_NAME: Internal only - Used by Index Management plugin to invoke stop replication
*/
const val STOP_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/stop"
const val INTERNAL_STOP_REPLICATION_ACTION_NAME = "indices:internal/plugins/replication/index/stop"

/**
* Stop replication transport action types.
*/
val STOP_REPLICATION_ACTION_TYPE =
ActionType(STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse)
val INTERNAL_STOP_REPLICATION_ACTION_TYPE =
ActionType(INTERNAL_STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.IndicesRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.clustermanager.AcknowledgedRequest
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ObjectParser
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser

class StopIndexReplicationRequest :
AcknowledgedRequest<StopIndexReplicationRequest>, IndicesRequest.Replaceable, ToXContentObject {
lateinit var indexName: String
constructor(indexName: String) {
this.indexName = indexName
}

private constructor() {
}

constructor(inp: StreamInput) : super(inp) {
indexName = inp.readString()
}
companion object {
private val PARSER = ObjectParser<StopIndexReplicationRequest, Void>("StopReplicationRequestParser") {
StopIndexReplicationRequest()
}

fun fromXContent(parser: XContentParser, followerIndex: String): StopIndexReplicationRequest {
val stopIndexReplicationRequest = PARSER.parse(parser, null)
stopIndexReplicationRequest.indexName = followerIndex
return stopIndexReplicationRequest
}
}

override fun validate(): ActionRequestValidationException? {
return null
}

override fun indices(vararg indices: String?): IndicesRequest {
return this
}
override fun indices(): Array<String> {
return arrayOf(indexName)
}

override fun indicesOptions(): IndicesOptions {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed()
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field("indexName", indexName)
builder.endObject()
return builder
}

override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeString(indexName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.replication

import com.nhaarman.mockitokotlin2.whenever
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Mockito.any
import org.mockito.Mockito.mock
import org.mockito.Mockito.verify
import org.mockito.junit.jupiter.MockitoExtension
import org.opensearch.action.support.clustermanager.AcknowledgedResponse
import org.opensearch.transport.client.node.NodeClient
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse

@ExtendWith(MockitoExtension::class)
internal class ReplicationPluginInterfaceTests {

@Test
fun `test stopReplication successful response`() {
// Mock dependencies
val client: NodeClient = mock()
val request: StopIndexReplicationRequest = mock()
val listener: ActionListener<AcknowledgedResponse> = mock()
val acknowledgedResponse = AcknowledgedResponse(true) // Successful response

// Mock the behavior of NodeClient.execute()
whenever(client.execute(any(), any(), any<ActionListener<ActionResponse>>()))
.thenAnswer { invocation ->
val actionListener = invocation.getArgument<ActionListener<ActionResponse>>(2)
actionListener.onResponse(acknowledgedResponse) // Simulate success
}

val replicationPluginInterface = ReplicationPluginInterface()
// Call method under test
replicationPluginInterface.stopReplication(client, request, listener)
// Verify that listener.onResponse is called with the correct response
verify(listener).onResponse(acknowledgedResponse)
}

@Test
fun `test stopReplication failure response`() {
// Mock dependencies
val client: NodeClient = mock()
val request: StopIndexReplicationRequest = mock()
val listener: ActionListener<AcknowledgedResponse> = mock()
val exception = Exception("Test failure")

// Mock the behavior of NodeClient.execute()
whenever(client.execute(any(), any(), any<ActionListener<ActionResponse>>()))
.thenAnswer { invocation ->
val actionListener = invocation.getArgument<ActionListener<ActionResponse>>(2)
actionListener.onFailure(exception) // Simulate failure
}

val replicationPluginInterface = ReplicationPluginInterface()
// Call method under test
replicationPluginInterface.stopReplication(client, request, listener)
// Verify that listener.onResponse is called with the correct response
verify(listener).onFailure(exception)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Test
import org.opensearch.commons.utils.recreateObject

internal class StopIndexReplicationRequestTests {
@Test
fun `Stop Replication request serialize and deserialize transport object should be equal`() {
val index = "test-idx"
val request = StopIndexReplicationRequest(index)
val recreatedRequest = recreateObject(request) { StopIndexReplicationRequest(it) }
assertNotNull(recreatedRequest)
assertEquals(request.indexName, recreatedRequest.indexName)
assertNull(recreatedRequest.validate())
}
}
Loading