diff --git a/docs/opensearch-table.md b/docs/opensearch-table.md index 8730f3ef8..f0cbe47a1 100644 --- a/docs/opensearch-table.md +++ b/docs/opensearch-table.md @@ -47,6 +47,37 @@ Using a wildcard index name: val df = spark.sql("SELECT * FROM dev.default.`my_index*`") df.show() ``` +## Data Types +The following table defines the data type mapping between OpenSearch index field type and Spark data type. + +| **OpenSearch FieldType** | **SparkDataType** | +|--------------------------|-----------------------------------| +| boolean | BooleanType | +| long | LongType | +| integer | IntegerType | +| short | ShortType | +| byte | ByteType | +| double | DoubleType | +| float | FloatType | +| date(Timestamp) | TimestampType | +| date(Date) | DateType | +| keyword | StringType, VarcharType, CharType | +| text | StringType(meta(osType)=text) | +| object | StructType | +| alias | Inherits referenced field type | + +* OpenSearch data type date is mapped to Spark data type based on the format: + * Map to DateType if format = strict_date, (we also support format = date, may change in future) + * Map to TimestampType if format = strict_date_optional_time_nanos, (we also support format = + strict_date_optional_time | epoch_millis, may change in future) +* Spark data types VarcharType(length) and CharType(length) are both currently mapped to Flint data + type *keyword*, dropping their length property. On the other hand, Flint data type *keyword* only + maps to StringType. +* Spark data type MapType is mapped to an empty OpenSearch object. The inner fields then rely on + dynamic mapping. On the other hand, Flint data type *object* only maps to StructType. +* Spark data type DecimalType is mapped to an OpenSearch double. On the other hand, Flint data type + *double* only maps to DoubleType. +* OpenSearch alias fields allow alternative names for existing fields in the schema without duplicating data. They inherit the data type and nullability of the referenced field and resolve dynamically to the primary field in queries. Join two indices ```scala diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala index 19fe28a2d..3d20593da 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala @@ -30,6 +30,8 @@ object FlintDataType { "dateFormat" -> DateFormatter.defaultPattern, "timestampFormat" -> STRICT_DATE_OPTIONAL_TIME_FORMATTER_WITH_NANOS) + val METADATA_ALIAS_PATH_NAME = "aliasPath" + /** * parse Flint metadata and extract properties to StructType. */ @@ -39,14 +41,38 @@ object FlintDataType { def deserializeJValue(json: JValue): StructType = { val properties = (json \ "properties").extract[Map[String, JValue]] - val fields = properties.map { case (fieldName, fieldProperties) => - deserializeFiled(fieldName, fieldProperties) + val (aliasProps, normalProps) = properties.partition { case (_, fieldProperties) => + (fieldProperties \ "type") match { + case JString("alias") => true + case _ => false + } } - StructType(fields.toSeq) + val fields: Seq[StructField] = normalProps.map { case (fieldName, fieldProperties) => + deserializeField(fieldName, fieldProperties) + }.toSeq + + val normalFieldMap: Map[String, StructField] = fields.map(f => f.name -> f).toMap + + val aliasFields: Seq[StructField] = aliasProps.map { case (fieldName, fieldProperties) => + val aliasPath = (fieldProperties \ "path").extract[String] + if (!normalFieldMap.contains(aliasPath)) { + throw new IllegalStateException( + s"Alias field [$fieldName] references undefined field [$aliasPath]") + } + val metadataBuilder = new MetadataBuilder() + metadataBuilder.putString(METADATA_ALIAS_PATH_NAME, aliasPath) + DataTypes.createStructField( + fieldName, + normalFieldMap(aliasPath).dataType, + true, + metadataBuilder.build()) + }.toSeq + + StructType(fields ++ aliasFields) } - def deserializeFiled(fieldName: String, fieldProperties: JValue): StructField = { + def deserializeField(fieldName: String, fieldProperties: JValue): StructField = { val metadataBuilder = new MetadataBuilder() val dataType = fieldProperties \ "type" match { // boolean diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala index a9e9122fb..7e2ca8c49 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.json.{JacksonUtils, JsonFilters, JSONOption import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, DateFormatter, DateTimeUtils, GenericArrayData, IntervalUtils, MapData, PartialResultException, RebaseDateTime, TimestampFormatter} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.flint.datatype.FlintDataType import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ @@ -448,13 +449,33 @@ class FlintJacksonParser( var badRecordException: Option[Throwable] = None var skipRow = false + // Build mapping from JSON key to sequence of schema field indices. + val fieldMapping: Map[String, Seq[Int]] = { + schema.fields.zipWithIndex.foldLeft(Map.empty[String, Seq[Int]]) { + case (acc, (field, idx)) => + val jsonKey = if (field.metadata.contains(FlintDataType.METADATA_ALIAS_PATH_NAME)) { + field.metadata.getString(FlintDataType.METADATA_ALIAS_PATH_NAME) + } else { + field.name + } + acc.updated(jsonKey, acc.getOrElse(jsonKey, Seq.empty[Int]) :+ idx) + } + } + structFilters.reset() while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) { - schema.getFieldIndex(parser.getCurrentName) match { - case Some(index) => + fieldMapping.get(parser.getCurrentName) match { + case Some(indices) => try { - row.update(index, fieldConverters(index).apply(parser)) - skipRow = structFilters.skipRow(row, index) + // All fields in indices are same type. + val fieldValue = fieldConverters(indices.head).apply(parser) + // Assign the parsed value to all schema fields mapped to this JSON key. + indices.foreach { idx => + row.update(idx, fieldValue) + if (structFilters.skipRow(row, idx)) { + skipRow = true + } + } } catch { case e: SparkUpgradeException => throw e case NonFatal(e) if isRoot => diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala index 312f3a5a1..c10ec5841 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala @@ -279,4 +279,45 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { val data: JValue = JsonMethods.parse(json) JsonMethods.compact(JsonMethods.render(data)) } + + test("alias field deserialize") { + val flintDataType = + """{ + | "properties": { + | "distance": { + | "type": "long" + | }, + | "route_length_miles": { + | "type": "alias", + | "path": "distance" + | }, + | "transit_mode": { + | "type": "keyword" + | } + | } + |}""".stripMargin + + val expectedStructType = StructType( + Seq( + StructField("distance", LongType, true), + StructField("transit_mode", StringType, true), + StructField( + "route_length_miles", + LongType, + true, + new MetadataBuilder().putString("aliasPath", "distance").build()))) + + val deserialized = FlintDataType.deserialize(flintDataType) + + deserialized.fields should have length (3) + deserialized.fields(0) shouldEqual expectedStructType.fields(0) + deserialized.fields(1) shouldEqual expectedStructType.fields(1) + + val aliasField = deserialized.fields(2) + aliasField.name shouldEqual "route_length_miles" + aliasField.dataType shouldEqual LongType + aliasField.metadata.contains("aliasPath") shouldBe true + aliasField.metadata.getString("aliasPath") shouldEqual "distance" + } + } diff --git a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchTableQueryITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchTableQueryITSuite.scala index 44011366a..4ac6991fb 100644 --- a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchTableQueryITSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchTableQueryITSuite.scala @@ -47,4 +47,22 @@ class OpenSearchTableQueryITSuite extends OpenSearchCatalogSuite with FlintPPLSu } } } + + test("Query index with alias data type") { + val index1 = "t0001" + withIndexName(index1) { + indexWithAlias(index1) + // select original field and alias field + var df = spark.sql(s"""SELECT id, alias FROM ${catalogName}.default.$index1""") + checkAnswer(df, Seq(Row(1, 1), Row(2, 2))) + + // filter on alias field + df = spark.sql(s"""SELECT id, alias FROM ${catalogName}.default.$index1 WHERE alias=1""") + checkAnswer(df, Row(1, 1)) + + // filter on original field + df = spark.sql(s"""SELECT id, alias FROM ${catalogName}.default.$index1 WHERE id=1""") + checkAnswer(df, Row(1, 1)) + } + } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala index 35c700aca..653cd3e8c 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala @@ -123,6 +123,22 @@ trait OpenSearchSuite extends BeforeAndAfterAll { index(indexName, oneNodeSetting, mappings, docs) } + def indexWithAlias(indexName: String): Unit = { + val mappings = """{ + | "properties": { + | "id": { + | "type": "integer" + | }, + | "alias": { + | "type": "alias", + | "path": "id" + | } + | } + |}""".stripMargin + val docs = Seq("""{"id": 1}""", """{"id": 2}""") + index(indexName, oneNodeSetting, mappings, docs) + } + def index(index: String, settings: String, mappings: String, docs: Seq[String]): Unit = { openSearchClient.indices.create( new CreateIndexRequest(index)