Skip to content

Commit

Permalink
write source tables to metadata cache
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <seankao@amazon.com>
  • Loading branch information
seankao-az committed Oct 31, 2024
1 parent b286b98 commit 1eb4646
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ object FlintSparkIndexFactory extends Logging {
private def getArrayString(map: java.util.Map[String, AnyRef], key: String): Array[String] = {
map.get(key) match {
case list: java.util.ArrayList[_] =>
list.toArray.map(_.asInstanceOf[String])
list.toArray.map(_.toString)
case _ => Array.empty[String]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName}
import org.apache.spark.sql.flint.{loadTable, parseTableName}

/**
* Flint Spark validation helper.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark.FlintSparkIndexOptions
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser

/**
Expand Down Expand Up @@ -47,8 +48,6 @@ case class FlintMetadataCache(
object FlintMetadataCache {

val metadataCacheVersion = "1.0"
val mockTableName =
"dataSourceName.default.logGroups(logGroupIdentifier:['arn:aws:logs:us-east-1:123456:test-llt-xa', 'arn:aws:logs:us-east-1:123456:sample-lg-1'])"

def apply(metadata: FlintMetadata): FlintMetadataCache = {
val indexOptions = FlintSparkIndexOptions(
Expand All @@ -61,18 +60,22 @@ object FlintMetadataCache {
} else {
None
}
val sourceTables = metadata.kind match {
case MV_INDEX_TYPE =>
metadata.properties.get("sourceTables") match {
case list: java.util.ArrayList[_] =>
list.toArray.map(_.toString)
case _ => Array.empty[String]
}
case _ => Array(metadata.source)
}
val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry =>
entry.lastRefreshCompleteTime match {
case FlintMetadataLogEntry.EMPTY_TIMESTAMP => None
case timestamp => Some(timestamp)
}
}

// TODO: get source tables from metadata
FlintMetadataCache(
metadataCacheVersion,
refreshInterval,
Array(mockTableName),
lastRefreshTime)
FlintMetadataCache(metadataCacheVersion, refreshInterval, sourceTables, lastRefreshTime)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,79 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
"",
Map.empty[String, Any])

it should "construct from FlintMetadata" in {
it should "construct from skipping index FlintMetadata" in {
val content =
""" {
| "_meta": {
| "kind": "test_kind",
| "kind": "skipping",
| "source": "spark_catalog.default.test_table",
| "options": {
| "auto_refresh": "true",
| "refresh_interval": "10 Minutes"
| }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin
val metadata = FlintOpenSearchIndexMetadataService
.deserialize(content)
.copy(latestLogEntry = Some(flintMetadataLogEntry))

val metadataCache = FlintMetadataCache(metadata)
metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion
metadataCache.refreshInterval.get shouldBe 600
metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table")
metadataCache.lastRefreshTime.get shouldBe 1234567890123L
}

it should "construct from covering index FlintMetadata" in {
val content =
""" {
| "_meta": {
| "kind": "covering",
| "source": "spark_catalog.default.test_table",
| "options": {
| "auto_refresh": "true",
| "refresh_interval": "10 Minutes"
| }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin
val metadata = FlintOpenSearchIndexMetadataService
.deserialize(content)
.copy(latestLogEntry = Some(flintMetadataLogEntry))

val metadataCache = FlintMetadataCache(metadata)
metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion
metadataCache.refreshInterval.get shouldBe 600
metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table")
metadataCache.lastRefreshTime.get shouldBe 1234567890123L
}

it should "construct from materialized view FlintMetadata" in {
val content =
""" {
| "_meta": {
| "kind": "mv",
| "source": "spark_catalog.default.wrong_table",
| "options": {
| "auto_refresh": "true",
| "refresh_interval": "10 Minutes"
| },
| "properties": {
| "sourceTables": [
| "spark_catalog.default.test_table",
| "spark_catalog.default.another_table"
| ]
| }
| },
| "properties": {
Expand All @@ -45,7 +110,9 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
val metadataCache = FlintMetadataCache(metadata)
metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion
metadataCache.refreshInterval.get shouldBe 600
metadataCache.sourceTables shouldBe Array(FlintMetadataCache.mockTableName)
metadataCache.sourceTables shouldBe Array(
"spark_catalog.default.test_table",
"spark_catalog.default.another_table")
metadataCache.lastRefreshTime.get shouldBe 1234567890123L
}

Expand Down Expand Up @@ -73,7 +140,7 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
val metadataCache = FlintMetadataCache(metadata)
metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion
metadataCache.refreshInterval shouldBe empty
metadataCache.sourceTables shouldBe Array(FlintMetadataCache.mockTableName)
metadataCache.sourceTables shouldBe empty
metadataCache.lastRefreshTime shouldBe empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService}
import org.opensearch.flint.spark.{FlintSparkIndexOptions, FlintSparkSuite}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}
import org.scalatest.Entry
import org.scalatest.matchers.should.Matchers

Expand Down Expand Up @@ -78,9 +80,9 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
| {
| "_meta": {
| "version": "${current()}",
| "name": "${testFlintIndex}",
| "name": "$testFlintIndex",
| "kind": "test_kind",
| "source": "test_source_table",
| "source": "$testTable",
| "indexedColumns": [
| {
| "test_field": "spark_type"
Expand All @@ -92,10 +94,10 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
| "properties": {
| "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}",
| "refreshInterval": 600,
| "sourceTables": ["${FlintMetadataCache.mockTableName}"],
| "lastRefreshTime": ${testLastRefreshCompleteTime}
| "sourceTables": ["$testTable"],
| "lastRefreshTime": $testLastRefreshCompleteTime
| },
| "latestId": "${testLatestId}"
| "latestId": "$testLatestId"
| },
| "properties": {
| "test_field": {
Expand All @@ -107,7 +109,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
val builder = new FlintMetadata.Builder
builder.name(testFlintIndex)
builder.kind("test_kind")
builder.source("test_source_table")
builder.source(testTable)
builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava)
builder.options(
Map("auto_refresh" -> "true", "refresh_interval" -> "10 Minutes")
Expand All @@ -133,10 +135,67 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
"metadataCacheVersion",
FlintMetadataCache.metadataCacheVersion),
Entry("lastRefreshTime", testLastRefreshCompleteTime))
}

Seq(SKIPPING_INDEX_TYPE, COVERING_INDEX_TYPE).foreach { case kind =>
test(s"write metadata cache to $kind index mappings with source tables") {
val content =
s""" {
| "_meta": {
| "kind": "$kind",
| "source": "$testTable"
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin
val metadata = FlintOpenSearchIndexMetadataService
.deserialize(content)
.copy(latestLogEntry = Some(flintMetadataLogEntry))
flintClient.createIndex(testFlintIndex, metadata)
flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata)

val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(testTable)
}
}

test(s"write metadata cache to materialized view index mappings with source tables") {
val testTable2 = "spark_catalog.default.metadatacache_test2"
val content =
s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "properties": {
| "sourceTables": [
| "$testTable", "$testTable2"
| ]
| }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin
val metadata = FlintOpenSearchIndexMetadataService
.deserialize(content)
.copy(latestLogEntry = Some(flintMetadataLogEntry))
flintClient.createIndex(testFlintIndex, metadata)
flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata)

val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName)
.toArray should contain theSameElementsAs Array(testTable, testTable2)
}

test("write metadata cache to index mappings with refresh interval") {
Expand Down Expand Up @@ -169,10 +228,6 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
FlintMetadataCache.metadataCacheVersion),
Entry("refreshInterval", 600),
Entry("lastRefreshTime", testLastRefreshCompleteTime))
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName)
}

test("exclude refresh interval in metadata cache when auto refresh is false") {
Expand Down Expand Up @@ -203,10 +258,6 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
"metadataCacheVersion",
FlintMetadataCache.metadataCacheVersion),
Entry("lastRefreshTime", testLastRefreshCompleteTime))
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName)
}

test("exclude last refresh time in metadata cache when index has not been refreshed") {
Expand All @@ -220,10 +271,6 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
properties should have size 2
properties should contain(
Entry("metadataCacheVersion", FlintMetadataCache.metadataCacheVersion))
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName)
}

test("write metadata cache to index mappings and preserve other index metadata") {
Expand Down Expand Up @@ -257,10 +304,6 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
"metadataCacheVersion",
FlintMetadataCache.metadataCacheVersion),
Entry("lastRefreshTime", testLastRefreshCompleteTime))
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName)

val newContent =
""" {
Expand Down Expand Up @@ -291,10 +334,6 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
"metadataCacheVersion",
FlintMetadataCache.metadataCacheVersion),
Entry("lastRefreshTime", testLastRefreshCompleteTime))
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName)
}

Seq(
Expand All @@ -309,7 +348,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
| {
| "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}",
| "refreshInterval": 600,
| "sourceTables": ["${FlintMetadataCache.mockTableName}"]
| "sourceTables": ["$testTable"]
| }
|""".stripMargin),
(
Expand All @@ -318,7 +357,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
s"""
| {
| "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}",
| "sourceTables": ["${FlintMetadataCache.mockTableName}"]
| "sourceTables": ["$testTable"]
| }
|""".stripMargin),
(
Expand All @@ -327,7 +366,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
s"""
| {
| "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}",
| "sourceTables": ["${FlintMetadataCache.mockTableName}"]
| "sourceTables": ["$testTable"]
| }
|""".stripMargin)).foreach { case (refreshMode, optionsMap, expectedJson) =>
test(s"write metadata cache for $refreshMode") {
Expand Down Expand Up @@ -402,7 +441,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
| {
| "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}",
| "refreshInterval": 600,
| "sourceTables": ["${FlintMetadataCache.mockTableName}"]
| "sourceTables": ["$testTable"]
| }
|""".stripMargin)

Expand Down

0 comments on commit 1eb4646

Please sign in to comment.