Skip to content

Commit

Permalink
Check version in Streams. More Tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandr Tuliakov <tulyakov@yandex-team.ru>
  • Loading branch information
MrChaos1993 committed Feb 24, 2025
1 parent 5ed6c6d commit 96302ac
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.indexmanagement.rollup.model

import org.apache.commons.codec.digest.DigestUtils
import org.opensearch.Version
import org.opensearch.common.settings.Settings
import org.opensearch.commons.authuser.User
import org.opensearch.core.common.io.stream.StreamInput
Expand Down Expand Up @@ -96,7 +97,7 @@ data class ISMRollup(
constructor(sin: StreamInput) : this(
description = sin.readString(),
targetIndex = sin.readString(),
targetIndexSettings = if (sin.readBoolean()) {
targetIndexSettings = if (sin.version.onOrAfter(Version.V_3_0_0) && sin.readBoolean()) {
Settings.readSettingsFromStream(sin)
} else {
null
Expand Down Expand Up @@ -143,8 +144,10 @@ data class ISMRollup(
override fun writeTo(out: StreamOutput) {
out.writeString(description)
out.writeString(targetIndex)
out.writeBoolean(targetIndexSettings != null)
if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out)
if (out.version.onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(targetIndexSettings != null)
if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out)
}
out.writeInt(pageSize)
out.writeVInt(dimensions.size)
for (dimension in dimensions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.indexmanagement.rollup.model

import org.opensearch.Version
import org.opensearch.common.settings.IndexScopedSettings
import org.opensearch.common.settings.Settings
import org.opensearch.commons.authuser.User
Expand Down Expand Up @@ -135,7 +136,7 @@ data class Rollup(
description = sin.readString(),
sourceIndex = sin.readString(),
targetIndex = sin.readString(),
targetIndexSettings = if (sin.readBoolean()) {
targetIndexSettings = if (sin.getVersion().onOrAfter(Version.V_3_0_0) && sin.readBoolean()) {
Settings.readSettingsFromStream(sin)
} else {
null
Expand Down Expand Up @@ -216,8 +217,10 @@ data class Rollup(
out.writeString(description)
out.writeString(sourceIndex)
out.writeString(targetIndex)
out.writeBoolean(targetIndexSettings != null)
if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out)
if (out.version.onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(targetIndexSettings != null)
if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out)
}
out.writeOptionalString(metadataID)
out.writeStringArray(emptyList<String>().toTypedArray())
out.writeInt(pageSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ package org.opensearch.indexmanagement.indexstatemanagement.action
import org.apache.hc.core5.http.ContentType
import org.apache.hc.core5.http.io.entity.StringEntity
import org.opensearch.cluster.metadata.DataStream
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.settings.Settings
import org.opensearch.index.engine.EngineConfig
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.common.model.dimension.Terms
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
Expand Down Expand Up @@ -88,6 +91,65 @@ class RollupActionIT : IndexStateManagementRestTestCase() {
assertIndexRolledUp(indexName, policyID, rollup)
}

fun `test rollup action with specified target index settings`() {
val indexName = "${testIndexName}_index_settings"
val policyID = "${testIndexName}_policy_settings"
val targetIdxTestName = "target_rollup_settings"
val targetIndexReplicas = 0
val targetIndexCodec = "best_compression"
val rollup =
ISMRollup(
description = "basic search test",
targetIndex = targetIdxTestName,
targetIndexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, targetIndexReplicas)
.put(EngineConfig.INDEX_CODEC_SETTING.key, targetIndexCodec)
.build(),
pageSize = 100,
dimensions =
listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID"),
),
metrics =
listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics =
listOf(
Sum(), Min(), Max(),
ValueCount(), Average(),
),
),
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())),
),
)
val actionConfig = RollupAction(rollup, 0)
val states =
listOf(
State("rollup", listOf(actionConfig), listOf()),
)
val sourceIndexMappingString =
"\"properties\": {\"tpep_pickup_datetime\": { \"type\": \"date\" }, \"RatecodeID\": { \"type\": " +
"\"keyword\" }, \"PULocationID\": { \"type\": \"keyword\" }, \"passenger_count\": { \"type\": \"integer\" }, \"total_amount\": " +
"{ \"type\": \"double\" }}"
val policy =
Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states,
)
createPolicy(policy, policyID)
createIndex(indexName, policyID, mapping = sourceIndexMappingString)

assertIndexRolledUp(indexName, policyID, rollup)
}

fun `test data stream rollup action`() {
val dataStreamName = "${testIndexName}_data_stream"
val policyID = "${testIndexName}_rollup_policy"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
return getRollup(rollupId = rollupId)
}

// TODO: can be replaced with createRandomRollup if implement assertEqual for mappings with "dynamic"=true fields
protected fun createRandomRollupWithoutTargetSettings(refresh: Boolean = true): Rollup {
val rollup = randomRollup().copy(targetIndexSettings = null)
val rollupId = createRollup(rollup, rollupId = rollup.id, refresh = refresh).id
return getRollup(rollupId = rollupId)
}

// TODO: Maybe clean-up and use XContentFactory.jsonBuilder() to create mappings json
protected fun createRollupMappingString(rollup: Rollup): String {
var mappingString = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@

package org.opensearch.indexmanagement.rollup

import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.index.codec.CodecService
import org.opensearch.index.engine.EngineConfig
import org.opensearch.index.query.TermQueryBuilder
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.common.model.dimension.Dimension
Expand All @@ -33,8 +37,9 @@ import org.opensearch.indexmanagement.rollup.model.metric.Metric
import org.opensearch.indexmanagement.rollup.model.metric.Min
import org.opensearch.indexmanagement.rollup.model.metric.Sum
import org.opensearch.indexmanagement.rollup.model.metric.ValueCount
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.rest.OpenSearchRestTestCase
import java.util.Locale
import java.util.*

fun randomInterval(): String = if (OpenSearchRestTestCase.randomBoolean()) randomFixedInterval() else randomCalendarInterval()

Expand Down Expand Up @@ -98,6 +103,14 @@ fun randomRollupDimensions(): List<Dimension> {
return dimensions.toList()
}

val codecs = listOf(CodecService.DEFAULT_CODEC, CodecService.LZ4, CodecService.BEST_COMPRESSION_CODEC, CodecService.ZLIB)

fun randomSettings(): Settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, OpenSearchTestCase.randomIntBetween(0, 10))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, OpenSearchTestCase.randomIntBetween(1, 10))
.put(EngineConfig.INDEX_CODEC_SETTING.key, OpenSearchRestTestCase.randomSubsetOf(1, codecs).first())
.build()

fun randomRollup(): Rollup {
val enabled = OpenSearchRestTestCase.randomBoolean()
return Rollup(
Expand All @@ -112,7 +125,7 @@ fun randomRollup(): Rollup {
description = OpenSearchRestTestCase.randomAlphaOfLength(10),
sourceIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT),
targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT),
targetIndexSettings = null,
targetIndexSettings = if (OpenSearchRestTestCase.randomBoolean()) null else randomSettings(),
metadataID = if (OpenSearchRestTestCase.randomBoolean()) null else OpenSearchRestTestCase.randomAlphaOfLength(10),
roles = emptyList(),
pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000),
Expand Down Expand Up @@ -173,7 +186,7 @@ fun randomExplainRollup(): ExplainRollup {
fun randomISMRollup(): ISMRollup = ISMRollup(
description = OpenSearchRestTestCase.randomAlphaOfLength(10),
targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT),
targetIndexSettings = null,
targetIndexSettings = if (OpenSearchRestTestCase.randomBoolean()) null else randomSettings(),
pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000),
dimensions = randomRollupDimensions(),
metrics = OpenSearchRestTestCase.randomList(20, ::randomRollupMetrics).distinctBy { it.targetField },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ISMRollupTests : OpenSearchTestCase() {

assertEquals(sourceIndex, rollup.sourceIndex)
assertEquals(ismRollup.targetIndex, rollup.targetIndex)
assertEquals(ismRollup.targetIndexSettings, rollup.targetIndexSettings)
assertEquals(ismRollup.pageSize, rollup.pageSize)
assertEquals(ismRollup.dimensions, rollup.dimensions)
assertEquals(ismRollup.metrics, rollup.metrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,19 @@ class RollupTests : OpenSearchTestCase() {
assertFailsWith(SettingsException::class, "Unknown property was `index.codec1`") {
randomRollup().copy(targetIndexSettings = Settings.builder().put("index.codec1", "zlib").build())
}
}

fun `test rollup with single setting in target index settings`() {
val sb = Settings.builder()
sb.put("index.codec", "zlib")
randomRollup().copy(targetIndexSettings = sb.build())
}

fun `test rollup with multiple setting in target index settings`() {
val sb = Settings.builder()
sb.put("index.number_of_replicas", 1)
sb.put("index.codec", "zlib")
sb.put("index.soft_deletes.retention_lease.period", "1h")
randomRollup().copy(targetIndexSettings = sb.build())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.opensearch.indexmanagement.rollup.randomRollup
import org.opensearch.indexmanagement.rollup.randomRollupMetadata
import org.opensearch.indexmanagement.rollup.randomRollupMetrics
import org.opensearch.indexmanagement.rollup.randomRollupStats
import org.opensearch.indexmanagement.rollup.randomSettings
import org.opensearch.indexmanagement.rollup.randomSum
import org.opensearch.indexmanagement.rollup.randomTerms
import org.opensearch.indexmanagement.rollup.randomValueCount
Expand Down Expand Up @@ -126,6 +127,14 @@ class WriteableTests : OpenSearchTestCase() {
assertTrue("roles field in rollup model is deprecated and should be parsed to empty list.", streamedRollup.roles.isEmpty())
}

fun `test rollup as stream with target index settings`() {
val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000), targetIndexSettings = randomSettings())
val out = BytesStreamOutput().also { rollup.writeTo(it) }
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val streamedRollup = Rollup(sin)
assertEquals("Round tripping Rollup stream with target index settings doesn't work", rollup, streamedRollup)
}

fun `test explain rollup as stream`() {
val explainRollup = randomExplainRollup()
val out = BytesStreamOutput().also { explainRollup.writeTo(it) }
Expand Down Expand Up @@ -165,4 +174,16 @@ class WriteableTests : OpenSearchTestCase() {
val streamedISMRollup = ISMRollup(sin)
assertEquals("Round tripping ISMRollup stream doesn't work", ismRollup, streamedISMRollup)
}

fun `test ism rollup as stream with target index settings`() {
val ismRollup = randomISMRollup().copy(targetIndexSettings = randomSettings())
val out = BytesStreamOutput().also { ismRollup.writeTo(it) }
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val streamedISMRollup = ISMRollup(sin)
assertEquals(
"Round tripping ISMRollup stream with target index settings doesn't work",
ismRollup,
streamedISMRollup,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.indexmanagement.rollup.randomMax
import org.opensearch.indexmanagement.rollup.randomMin
import org.opensearch.indexmanagement.rollup.randomRollup
import org.opensearch.indexmanagement.rollup.randomRollupMetrics
import org.opensearch.indexmanagement.rollup.randomSettings
import org.opensearch.indexmanagement.rollup.randomSum
import org.opensearch.indexmanagement.rollup.randomTerms
import org.opensearch.indexmanagement.rollup.randomValueCount
Expand Down Expand Up @@ -120,13 +121,28 @@ class XContentTests : OpenSearchTestCase() {
assertEquals("Round tripping Rollup without type doesn't work", rollup.copy(roles = listOf()), parsedRollup)
}

fun `test rollup parsing with target index settings`() {
val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000), targetIndexSettings = randomSettings())
val rollupString = rollup.toJsonString(XCONTENT_WITHOUT_TYPE)
val parsedRollup = Rollup.parse(parser(rollupString), rollup.id, rollup.seqNo, rollup.primaryTerm)
// roles are deprecated and not populated in toXContent and parsed as part of parse
assertEquals("Round tripping Rollup with target index settings doesn't work", rollup.copy(roles = listOf()), parsedRollup)
}

fun `test ism rollup parsing`() {
val ismRollup = randomISMRollup()
val ismRollupString = ismRollup.toJsonString()
val parsedISMRollup = ISMRollup.parse(parser(ismRollupString))
assertEquals("Round tripping ISMRollup doesn't work", ismRollup, parsedISMRollup)
}

fun `test ism rollup parsing with target index settings`() {
val ismRollup = randomISMRollup().copy(targetIndexSettings = randomSettings())
val ismRollupString = ismRollup.toJsonString()
val parsedISMRollup = ISMRollup.parse(parser(ismRollupString))
assertEquals("Round tripping ISMRollup with target index settings doesn't work", ismRollup, parsedISMRollup)
}

private fun parser(xc: String): XContentParser {
val parser = XContentType.JSON.xContent().createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, xc)
parser.nextToken()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class RestIndexRollupActionIT : RollupRestAPITestCase() {

@Throws(Exception::class)
fun `test mappings after rollup creation`() {
createRandomRollup()
createRandomRollupWithoutTargetSettings()

val response = client().makeRequest("GET", "/$INDEX_MANAGEMENT_INDEX/_mapping")
val parserMap = createParser(XContentType.JSON.xContent(), response.entity.content).map() as Map<String, Map<String, Any>>
Expand Down
Loading

0 comments on commit 96302ac

Please sign in to comment.