Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand ppl command #868

Merged
merged 15 commits into from
Nov 7, 2024
37 changes: 24 additions & 13 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,30 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in

_- **Limitation: another command usage of (relation) subquery is in `appendcols` commands which is unsupported**_

---
#### Experimental Commands:

#### **fillnull**
[See additional command details](ppl-fillnull-command.md)
```sql
- `source=accounts | fillnull fields status_code=101`
- `source=accounts | fillnull fields request_path='/not_found', timestamp='*'`
- `source=accounts | fillnull using field1=101`
- `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5`
- `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5, field6 = 'N/A'`
```

#### **expand**
[See additional command details](ppl-expand-command.md)
```sql
- `source = table | expand field_with_array as array_list`
- `source = table | expand employee | stats max(salary) as max by state, company`
- `source = table | expand employee as worker | stats max(salary) as max by state, company`
- `source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus`
- `source = table | expand employee | parse description '(?<email>.+@.+)' | fields employee, email`
- `source = table | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid`
- `source = table | expand multi_valueA as multiA | expand multi_valueB as multiB`
```

#### Correlation Commands:
[See additional command details](ppl-correlation-command.md)

```sql
Expand All @@ -450,14 +472,3 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols`
> ppl-correlation-command is an experimental command - it may be removed in future versions

---
### Planned Commands:

#### **fillnull**
[See additional command details](ppl-fillnull-command.md)
```sql
- `source=accounts | fillnull fields status_code=101`
- `source=accounts | fillnull fields request_path='/not_found', timestamp='*'`
- `source=accounts | fillnull using field1=101`
- `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5`
- `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5, field6 = 'N/A'`
```
2 changes: 2 additions & 0 deletions docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).
- [`correlation commands`](ppl-correlation-command.md)

- [`trendline commands`](ppl-trendline-command.md)

- [`expand commands`](ppl-expand-command.md)

* **Functions**

Expand Down
45 changes: 45 additions & 0 deletions docs/ppl-lang/ppl-expand-command.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
## PPL `expand` command

### Description
Using `expand` command to flatten a field of type:
- `Array<Any>`
- `Map<Any>`


### Syntax
`expand <field> [As alias]`

* field: to be expanded (exploded). The field must be of supported type.
* alias: Optional to be expanded as the name to be used instead of the original field name

### Usage Guidelines
The expand command produces a row for each element in the specified array or map field, where:
- Array elements become individual rows.
- Map key-value pairs are broken into separate rows, with each key-value represented as a row.

- When an alias is provided, the exploded values are represented under the alias instead of the original field name.
- This can be used in combination with other commands, such as stats, eval, and parse to manipulate or extract data post-expansion.

### Examples:
- `source = table | expand employee | stats max(salary) as max by state, company`
- `source = table | expand employee as worker | stats max(salary) as max by state, company`
- `source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus`
- `source = table | expand employee | parse description '(?<email>.+@.+)' | fields employee, email`
- `source = table | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid`
- `source = table | expand multi_valueA as multiA | expand multi_valueB as multiB`

- Expand command can be used in combination with other commands such as `eval`, `stats` and more
- Using multiple expand commands will create a cartesian product of all the internal elements within each composite array or map

### Effective SQL push-down query
The expand command is translated into an equivalent SQL operation using LATERAL VIEW explode, allowing for efficient exploding of arrays or maps at the SQL query level.

```sql
SELECT customer exploded_productId
FROM table
LATERAL VIEW explode(productId) AS exploded_productId
```
Where the `explode` command offers the following functionality:
- it is a column operation that returns a new column
- it creates a new row for every element in the exploded column
- internal `null`s are ignored as part of the exploded field (no row is created/exploded for null)
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,28 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
|""".stripMargin)
}

protected def createMultiColumnArrayTable(testTable: String): Unit = {
// CSV doesn't support struct field
sql(s"""
| CREATE TABLE $testTable
| (
| int_col INT,
| multi_valueA Array<STRUCT<name: STRING, value: INT>>,
| multi_valueB Array<STRUCT<name: STRING, value: INT>>
| )
| USING JSON
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES
| ( 1, array(STRUCT("1_one", 1), STRUCT(null, 11), STRUCT("1_three", null)), array(STRUCT("2_Monday", 2), null) ),
| ( 2, array(STRUCT("2_Monday", 2), null) , array(STRUCT("3_third", 3), STRUCT("3_4th", 4)) ),
| ( 3, array(STRUCT("3_third", 3), STRUCT("3_4th", 4)) , array(STRUCT("1_one", 1))),
| ( 4, null, array(STRUCT("1_one", 1)))
|""".stripMargin)
}

protected def createTableIssue112(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.flint.spark.ppl

import java.nio.file.Files

import scala.collection.mutable

import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, Explode, GeneratorOuter, Literal, Or}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLExpandITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

private val testTable = "flint_ppl_test"
private val occupationTable = "spark_catalog.default.flint_ppl_flat_table_test"
private val structNestedTable = "spark_catalog.default.flint_ppl_struct_nested_test"
private val structTable = "spark_catalog.default.flint_ppl_struct_test"
private val multiValueTable = "spark_catalog.default.flint_ppl_multi_value_test"
private val multiArraysTable = "spark_catalog.default.flint_ppl_multi_array_test"
private val tempFile = Files.createTempFile("jsonTestData", ".json")

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createNestedJsonContentTable(tempFile, testTable)
createOccupationTable(occupationTable)
createStructNestedTable(structNestedTable)
createStructTable(structTable)
createMultiValueStructTable(multiValueTable)
createMultiColumnArrayTable(multiArraysTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

override def afterAll(): Unit = {
super.afterAll()
Files.deleteIfExists(tempFile)
}

test("expand for eval field of an array") {
val frame = sql(
s""" source = $occupationTable | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid
""".stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row("Jake", "Engineer", 1),
Row("Jake", "Engineer", 2),
Row("Jake", "Engineer", 3),
Row("Hello", "Artist", 1),
Row("Hello", "Artist", 2),
Row("Hello", "Artist", 3),
Row("John", "Doctor", 1),
Row("John", "Doctor", 2),
Row("John", "Doctor", 3),
Row("David", "Doctor", 1),
Row("David", "Doctor", 2),
Row("David", "Doctor", 3),
Row("David", "Unemployed", 1),
Row("David", "Unemployed", 2),
Row("David", "Unemployed", 3),
Row("Jane", "Scientist", 1),
Row("Jane", "Scientist", 2),
Row("Jane", "Scientist", 3))

// Compare the results
assert(results.toSet == expectedResults.toSet)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
// expected plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_flat_table_test"))
val jsonFunc =
UnresolvedFunction("array", Seq(Literal(1), Literal(2), Literal(3)), isDistinct = false)
val aliasA = Alias(jsonFunc, "array")()
val project = Project(seq(UnresolvedStar(None), aliasA), table)
val generate = Generate(
Explode(UnresolvedAttribute("array")),
seq(),
false,
None,
seq(UnresolvedAttribute("uid")),
project)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("array")), generate)
val expectedPlan = Project(
seq(
UnresolvedAttribute("name"),
UnresolvedAttribute("occupation"),
UnresolvedAttribute("uid")),
dropSourceColumn)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("expand for structs") {
val frame = sql(
s""" source = $multiValueTable | expand multi_value AS exploded_multi_value | fields exploded_multi_value
""".stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(Row("1_one", 1)),
Row(Row(null, 11)),
Row(Row("1_three", null)),
Row(Row("2_Monday", 2)),
Row(null),
Row(Row("3_third", 3)),
Row(Row("3_4th", 4)),
Row(null))
// Compare the results
assert(results.toSet == expectedResults.toSet)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
// expected plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test"))
val generate = Generate(
Explode(UnresolvedAttribute("multi_value")),
seq(),
outer = false,
None,
seq(UnresolvedAttribute("exploded_multi_value")),
table)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("multi_value")), generate)
val expectedPlan = Project(Seq(UnresolvedAttribute("exploded_multi_value")), dropSourceColumn)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("expand for array of structs") {
val frame = sql(s"""
| source = $testTable
| | where country = 'England' or country = 'Poland'
| | expand bridges
| | fields bridges
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge")))),
Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge"))))
// Row(null)) -> in case of outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) it will include the `null` row
)

// Compare the results
assert(results.toSet == expectedResults.toSet)
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("flint_ppl_test"))
val filter = Filter(
Or(
EqualTo(UnresolvedAttribute("country"), Literal("England")),
EqualTo(UnresolvedAttribute("country"), Literal("Poland"))),
table)
val generate =
Generate(Explode(UnresolvedAttribute("bridges")), seq(), outer = false, None, seq(), filter)
val expectedPlan = Project(Seq(UnresolvedAttribute("bridges")), generate)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("expand for array of structs with alias") {
val frame = sql(s"""
| source = $testTable
| | where country = 'England'
| | expand bridges as britishBridges
| | fields britishBridges
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(Row(801, "Tower Bridge")),
Row(Row(928, "London Bridge")),
Row(Row(801, "Tower Bridge")),
Row(Row(928, "London Bridge")))
// Compare the results
assert(results.toSet == expectedResults.toSet)
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("flint_ppl_test"))
val filter = Filter(EqualTo(UnresolvedAttribute("country"), Literal("England")), table)
val generate = Generate(
Explode(UnresolvedAttribute("bridges")),
seq(),
outer = false,
None,
seq(UnresolvedAttribute("britishBridges")),
filter)
val dropColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("bridges")), generate)
val expectedPlan = Project(Seq(UnresolvedAttribute("britishBridges")), dropColumn)

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("expand multi columns array table") {
val frame = sql(s"""
| source = $multiArraysTable
| | expand multi_valueA as multiA
| | expand multi_valueB as multiB
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1, Row("1_one", 1), Row("2_Monday", 2)),
Row(1, Row("1_one", 1), null),
Row(1, Row(null, 11), Row("2_Monday", 2)),
Row(1, Row(null, 11), null),
Row(1, Row("1_three", null), Row("2_Monday", 2)),
Row(1, Row("1_three", null), null),
Row(2, Row("2_Monday", 2), Row("3_third", 3)),
Row(2, Row("2_Monday", 2), Row("3_4th", 4)),
Row(2, null, Row("3_third", 3)),
Row(2, null, Row("3_4th", 4)),
Row(3, Row("3_third", 3), Row("1_one", 1)),
Row(3, Row("3_4th", 4), Row("1_one", 1)))
// Compare the results
assert(results.toSet == expectedResults.toSet)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_array_test"))
val generatorA = Explode(UnresolvedAttribute("multi_valueA"))
val generateA =
Generate(generatorA, seq(), false, None, seq(UnresolvedAttribute("multiA")), table)
val dropSourceColumnA =
DataFrameDropColumns(Seq(UnresolvedAttribute("multi_valueA")), generateA)
val generatorB = Explode(UnresolvedAttribute("multi_valueB"))
val generateB = Generate(
generatorB,
seq(),
false,
None,
seq(UnresolvedAttribute("multiB")),
dropSourceColumnA)
val dropSourceColumnB =
DataFrameDropColumns(Seq(UnresolvedAttribute("multi_valueB")), generateB)
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumnB)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ KMEANS: 'KMEANS';
AD: 'AD';
ML: 'ML';
FILLNULL: 'FILLNULL';
EXPAND: 'EXPAND';
FLATTEN: 'FLATTEN';
TRENDLINE: 'TRENDLINE';

Expand Down
Loading
Loading