Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding replication (CCR) plugin interface and classes to common-utils #667

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication

import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_TYPE
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.Writeable

/**
* Transport action plugin interfaces for the cross-cluster-replication plugin.
*/
object ReplicationPluginInterface {

/**
* Stop replication.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/

fun stopReplication(
client: NodeClient,
request: StopIndexReplicationRequest,
listener: ActionListener<AcknowledgedResponse>
) {
return client.execute(
UNFOLLOW_REPLICATION_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
AcknowledgedResponse(it)
}
}
)
}

/**
* Wrap action listener on concrete response class by a new created one on ActionResponse.
* This is required because the response may be loaded by different classloader across plugins.
* The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate
* the response object.
*/
@Suppress("UNCHECKED_CAST")
private fun <Response : AcknowledgedResponse> wrapActionListener(
listener: ActionListener<Response>,
recreate: (Writeable) -> Response
): ActionListener<Response> {
return object : ActionListener<ActionResponse> {
override fun onResponse(response: ActionResponse) {
val recreated = response as? Response ?: recreate(response)
listener.onResponse(recreated)
}

override fun onFailure(exception: java.lang.Exception) {
listener.onFailure(exception)
}
} as ActionListener<Response>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.opensearch.action.ActionType
import org.opensearch.action.support.master.AcknowledgedResponse

/**
* Information related to the transport stop replication action for the Replication plugin
*/
object ReplicationActions {

/**
* Action names for stopping replication
* STOP_REPLICATION_ACTION_NAME: action used for _stop REST API
* UNFOLLOW_REPLICATION_ACTION_NAME: internal action used for inter-plugin communication i.e. by ism to invoke stop
* replication.
*/
const val STOP_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/stop"
const val UNFOLLOW_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/unfollow"

/**
* Stop replication transport action types.
*/
val STOP_REPLICATION_ACTION_TYPE =
ActionType(STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse)
val UNFOLLOW_REPLICATION_ACTION_TYPE =
ActionType(UNFOLLOW_REPLICATION_ACTION_NAME, ::AcknowledgedResponse)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.opensearch.commons.replication.action

import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.IndicesRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedRequest
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ObjectParser
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser

class StopIndexReplicationRequest :
AcknowledgedRequest<StopIndexReplicationRequest>, IndicesRequest.Replaceable, ToXContentObject {
lateinit var indexName: String
constructor(indexName: String) {
this.indexName = indexName
}

private constructor() {
}

constructor(inp: StreamInput) : super(inp) {
indexName = inp.readString()
}
companion object {
private val PARSER = ObjectParser<StopIndexReplicationRequest, Void>("StopReplicationRequestParser") {
StopIndexReplicationRequest()
}

fun fromXContent(parser: XContentParser, followerIndex: String): StopIndexReplicationRequest {
val stopIndexReplicationRequest = PARSER.parse(parser, null)
stopIndexReplicationRequest.indexName = followerIndex
return stopIndexReplicationRequest
}
}

override fun validate(): ActionRequestValidationException? {
return null
}

override fun indices(vararg indices: String?): IndicesRequest {
return this
}
override fun indices(): Array<String> {
return arrayOf(indexName)
}

override fun indicesOptions(): IndicesOptions {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed()
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field("indexName", indexName)
builder.endObject()
return builder
}

override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeString(indexName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.replication

import com.nhaarman.mockitokotlin2.whenever
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Answers
import org.mockito.ArgumentMatchers
import org.mockito.Mock
import org.mockito.Mockito
import org.mockito.junit.jupiter.MockitoExtension
import org.opensearch.action.ActionType
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.core.action.ActionListener

@Suppress("UNCHECKED_CAST")
@ExtendWith(MockitoExtension::class)
internal class ReplicationPluginInterfaceTests {

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private lateinit var client: NodeClient

@Test
fun stopReplication() {
val request = Mockito.mock(StopIndexReplicationRequest::class.java)
val response = AcknowledgedResponse(true)
val listener: ActionListener<AcknowledgedResponse> =
Mockito.mock(ActionListener::class.java) as ActionListener<AcknowledgedResponse>

Mockito.doAnswer {
(it.getArgument(2) as ActionListener<AcknowledgedResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())

ReplicationPluginInterface.stopReplication(client, request, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Test
import org.opensearch.commons.utils.recreateObject

internal class StopIndexReplicationRequestTests {
@Test
fun `Stop Replication request serialize and deserialize transport object should be equal`() {
val index = "test-idx"
val request = StopIndexReplicationRequest(index)
val recreatedRequest = recreateObject(request) { StopIndexReplicationRequest(it) }
assertNotNull(recreatedRequest)
assertEquals(request.indexName, recreatedRequest.indexName)
assertNull(recreatedRequest.validate())
}
}
Loading